Database Source

Poll a relational database for new or updated records using SQL queries on a configurable interval.

Overview

The Database source periodically executes a SQL query against a relational database and feeds each result row into the channel pipeline. This pattern is common in healthcare systems where clinical applications write data to databases but do not support event-based notifications.

After successfully reading and processing rows, an optional post_process_statement can be executed to mark records as processed, preventing duplicate reads on the next poll cycle.

Warning Always include a filter condition in your query (e.g. WHERE processed = false) and use the post_process_statement to mark rows as processed. Without this, rows will be re-read on every poll cycle.

Configuration

yaml
listener:
  type: database
  database:
    driver: postgres
    dsn: postgres://${DB_USER}:${DB_PASS}@db.internal:5432/clinical?sslmode=require
    poll_interval: 60s
    query: |
      SELECT id, patient_mrn, order_code, order_date, status
      FROM orders
      WHERE processed = false
      ORDER BY order_date ASC
      LIMIT 100
    post_process_statement: |
      UPDATE orders SET processed = true WHERE id = ANY($1)

Properties

driver string required
Database driver. One of postgres, mysql, mssql, or oracle.
dsn string required
Data Source Name (connection string). Supports environment variable interpolation for credentials.
poll_interval string optional
How often to execute the query. Accepts Go duration strings such as 30s, 1m, 5m. Defaults to 60s.
query string required
SQL SELECT query to execute on each poll cycle. Each row returned is passed as a separate message into the channel pipeline.
post_process_statement string optional
SQL statement executed after successful processing of a batch. Typically used to mark rows as processed. Receives the list of processed row IDs as a parameter.
tls object optional
TLS configuration for encrypted database connections. Contains ca_file, cert_file, and key_file.

Supported Drivers

Driver DSN Format Notes
postgres postgres://user:pass@host:5432/db Supports sslmode parameter for TLS.
mysql user:pass@tcp(host:3306)/db Append ?tls=true for encrypted connections.
mssql sqlserver://user:pass@host:1433?database=db Common in on-premises healthcare environments.
oracle oracle://user:pass@host:1521/service Requires Oracle client libraries at runtime.

Complete Example

Poll a PostgreSQL database for unprocessed pharmacy orders every 60 seconds and forward them as FHIR MedicationRequest resources.

yaml
id: pharmacy-order-poll
enabled: true

listener:
  type: database
  database:
    driver: postgres
    dsn: postgres://${DB_USER}:${DB_PASS}@db.internal:5432/pharmacy?sslmode=require
    poll_interval: 60s
    query: |
      SELECT id, patient_mrn, drug_code, drug_name, dosage,
             frequency, prescriber_npi, order_date
      FROM medication_orders
      WHERE processed = false
      ORDER BY order_date ASC
      LIMIT 100
    post_process_statement: |
      UPDATE medication_orders SET processed = true WHERE id = ANY($1)

validator:
  runtime: node
  entrypoint: validator.ts

transformer:
  runtime: node
  entrypoint: transformer.ts

destinations:
  - fhir-pharmacy-api

TypeScript Transformer Example

This transformer maps a database row representing a medication order into a FHIR MedicationRequest resource.

typescript
import { Message, TransformResult } from "@intu/sdk";

interface MedicationOrderRow {
  id: number;
  patient_mrn: string;
  drug_code: string;
  drug_name: string;
  dosage: string;
  frequency: string;
  prescriber_npi: string;
  order_date: string;
}

export default function transform(msg: Message): TransformResult {
  let row: MedicationOrderRow;
  try {
    row = JSON.parse(msg.body as string);
  } catch {
    return { success: false, error: "Failed to parse database row" };
  }

  const medicationRequest = {
    resourceType: "MedicationRequest",
    identifier: [
      {
        system: "urn:oid:1.2.3.4.5.6.7",
        value: String(row.id),
      },
    ],
    status: "active",
    intent: "order",
    medicationCodeableConcept: {
      coding: [
        {
          system: "http://www.nlm.nih.gov/research/umls/rxnorm",
          code: row.drug_code,
          display: row.drug_name,
        },
      ],
    },
    subject: {
      identifier: {
        system: "urn:oid:2.16.840.1.113883.19.5",
        value: row.patient_mrn,
      },
    },
    authoredOn: row.order_date,
    requester: {
      identifier: {
        system: "http://hl7.org/fhir/sid/us-npi",
        value: row.prescriber_npi,
      },
    },
    dosageInstruction: [
      {
        text: `${row.dosage} ${row.frequency}`,
        timing: {
          code: {
            text: row.frequency,
          },
        },
        doseAndRate: [
          {
            doseQuantity: {
              value: parseFloat(row.dosage) || 0,
              unit: "mg",
              system: "http://unitsofmeasure.org",
              code: "mg",
            },
          },
        ],
      },
    ],
  };

  return {
    success: true,
    body: JSON.stringify(medicationRequest),
    contentType: "application/fhir+json",
  };
}