Database Source
Poll a relational database for new or updated records using SQL queries on a configurable interval.
Overview
The Database source periodically executes a SQL query against a relational database and feeds each result row into the channel pipeline. This pattern is common in healthcare systems where clinical applications write data to databases but do not support event-based notifications.
After successfully reading and processing rows, an optional post_process_statement can be executed to mark records as processed, preventing duplicate reads on the next poll cycle.
WHERE processed = false) and use the post_process_statement to mark rows as processed. Without this, rows will be re-read on every poll cycle.
Configuration
listener:
type: database
database:
driver: postgres
dsn: postgres://${DB_USER}:${DB_PASS}@db.internal:5432/clinical?sslmode=require
poll_interval: 60s
query: |
SELECT id, patient_mrn, order_code, order_date, status
FROM orders
WHERE processed = false
ORDER BY order_date ASC
LIMIT 100
post_process_statement: |
UPDATE orders SET processed = true WHERE id = ANY($1)
Properties
postgres, mysql, mssql, or oracle.30s, 1m, 5m. Defaults to 60s.ca_file, cert_file, and key_file.Supported Drivers
| Driver | DSN Format | Notes |
|---|---|---|
postgres |
postgres://user:pass@host:5432/db |
Supports sslmode parameter for TLS. |
mysql |
user:pass@tcp(host:3306)/db |
Append ?tls=true for encrypted connections. |
mssql |
sqlserver://user:pass@host:1433?database=db |
Common in on-premises healthcare environments. |
oracle |
oracle://user:pass@host:1521/service |
Requires Oracle client libraries at runtime. |
Complete Example
Poll a PostgreSQL database for unprocessed pharmacy orders every 60 seconds and forward them as FHIR MedicationRequest resources.
id: pharmacy-order-poll
enabled: true
listener:
type: database
database:
driver: postgres
dsn: postgres://${DB_USER}:${DB_PASS}@db.internal:5432/pharmacy?sslmode=require
poll_interval: 60s
query: |
SELECT id, patient_mrn, drug_code, drug_name, dosage,
frequency, prescriber_npi, order_date
FROM medication_orders
WHERE processed = false
ORDER BY order_date ASC
LIMIT 100
post_process_statement: |
UPDATE medication_orders SET processed = true WHERE id = ANY($1)
validator:
runtime: node
entrypoint: validator.ts
transformer:
runtime: node
entrypoint: transformer.ts
destinations:
- fhir-pharmacy-api
TypeScript Transformer Example
This transformer maps a database row representing a medication order into a FHIR MedicationRequest resource.
import { Message, TransformResult } from "@intu/sdk";
interface MedicationOrderRow {
id: number;
patient_mrn: string;
drug_code: string;
drug_name: string;
dosage: string;
frequency: string;
prescriber_npi: string;
order_date: string;
}
export default function transform(msg: Message): TransformResult {
let row: MedicationOrderRow;
try {
row = JSON.parse(msg.body as string);
} catch {
return { success: false, error: "Failed to parse database row" };
}
const medicationRequest = {
resourceType: "MedicationRequest",
identifier: [
{
system: "urn:oid:1.2.3.4.5.6.7",
value: String(row.id),
},
],
status: "active",
intent: "order",
medicationCodeableConcept: {
coding: [
{
system: "http://www.nlm.nih.gov/research/umls/rxnorm",
code: row.drug_code,
display: row.drug_name,
},
],
},
subject: {
identifier: {
system: "urn:oid:2.16.840.1.113883.19.5",
value: row.patient_mrn,
},
},
authoredOn: row.order_date,
requester: {
identifier: {
system: "http://hl7.org/fhir/sid/us-npi",
value: row.prescriber_npi,
},
},
dosageInstruction: [
{
text: `${row.dosage} ${row.frequency}`,
timing: {
code: {
text: row.frequency,
},
},
doseAndRate: [
{
doseQuantity: {
value: parseFloat(row.dosage) || 0,
unit: "mg",
system: "http://unitsofmeasure.org",
code: "mg",
},
},
],
},
],
};
return {
success: true,
body: JSON.stringify(medicationRequest),
contentType: "application/fhir+json",
};
}