Email Source

Poll an email inbox for messages via IMAP or POP3 with attachment extraction and filtering.

Overview

The Email source connects to a mail server using IMAP or POP3 and periodically checks for new messages. When new emails arrive that match the configured filter criteria, their body content and optionally their attachments are extracted and fed into the channel pipeline.

Email-based integrations still exist in many healthcare organizations, particularly for receiving lab results, clinical reports, or referral documents from external providers who lack modern API or file-based integration capabilities.

Warning Enable TLS for all email connections to protect PHI. Use application-specific passwords or OAuth tokens instead of primary account credentials when possible.

Configuration

yaml
listener:
  type: email
  email:
    protocol: imap
    host: mail.clinic.example.com
    port: 993
    poll_interval: 2m
    tls:
      enabled: true
    auth:
      username: ${EMAIL_USER}
      password: ${EMAIL_PASS}
    folder: INBOX
    filter: "UNSEEN SUBJECT \"HL7 Report\""
    read_attachments: true
    delete_after_read: false

Properties

protocol string required
Email protocol. One of imap or pop3.
host string required
Hostname or IP address of the mail server.
port int optional
Mail server port. Defaults to 993 for IMAP with TLS or 995 for POP3 with TLS.
poll_interval string optional
How often to check for new messages. Accepts Go duration strings. Defaults to 1m.
tls object optional
TLS configuration. Contains enabled (bool), ca_file, and skip_verify options.
auth object required
Authentication credentials. Contains username and password.
folder string optional
Mail folder to monitor. Defaults to INBOX. IMAP only.
filter string optional
IMAP search filter string to select specific messages. Examples: UNSEEN, SUBJECT "Lab Results", FROM "lab@example.com".
read_attachments bool optional
When true, extract and process email attachments instead of the email body. Each attachment is processed as a separate message. Defaults to false.
delete_after_read bool optional
When true, delete emails after successful processing. Defaults to false (emails are marked as read instead).

Complete Example

Poll an IMAP inbox every 2 minutes for unread emails with HL7 attachments from a specific sender.

yaml
id: email-hl7-import
enabled: true

listener:
  type: email
  email:
    protocol: imap
    host: mail.clinic.example.com
    port: 993
    poll_interval: 2m
    tls:
      enabled: true
    auth:
      username: ${EMAIL_USER}
      password: ${EMAIL_PASS}
    folder: INBOX
    filter: "UNSEEN FROM \"results@labcorp.example.com\""
    read_attachments: true
    delete_after_read: false

validator:
  runtime: node
  entrypoint: validator.ts

transformer:
  runtime: node
  entrypoint: transformer.ts

destinations:
  - results-db
  - audit-log

TypeScript Transformer Example

This transformer extracts HL7 content from an email attachment (provided as a base64-encoded string in the message metadata) and maps lab observation segments to FHIR Observation resources.

typescript
import { Message, TransformResult } from "@intu/sdk";

interface Observation {
  resourceType: string;
  status: string;
  code: {
    coding: { system: string; code: string; display: string }[];
  };
  valueQuantity?: {
    value: number;
    unit: string;
    system: string;
    code: string;
  };
  valueString?: string;
  subject: { reference: string };
  effectiveDateTime: string;
  referenceRange?: { text: string }[];
}

export default function transform(msg: Message): TransformResult {
  const content = msg.body as string;
  const segments = content.split("\r").filter(Boolean);

  const pid = segments.find((s) => s.startsWith("PID|"));
  if (!pid) {
    return { success: false, error: "No PID segment found in attachment" };
  }

  const pidFields = pid.split("|");
  const patientId = pidFields[3]?.split("^")[0] || "unknown";

  const obxSegments = segments.filter((s) => s.startsWith("OBX|"));
  const observations: Observation[] = obxSegments.map((obx) => {
    const fields = obx.split("|");
    const valueType = fields[2] || "ST";
    const testId = fields[3]?.split("^") || [];
    const rawValue = fields[5] || "";
    const units = fields[6] || "";
    const refRange = fields[7] || "";
    const timestamp = fields[14] || "";

    const obs: Observation = {
      resourceType: "Observation",
      status: "final",
      code: {
        coding: [
          {
            system: "http://loinc.org",
            code: testId[0] || "",
            display: testId[1] || "",
          },
        ],
      },
      subject: { reference: `Patient/${patientId}` },
      effectiveDateTime: formatTimestamp(timestamp),
    };

    if (valueType === "NM" && !isNaN(parseFloat(rawValue))) {
      obs.valueQuantity = {
        value: parseFloat(rawValue),
        unit: units,
        system: "http://unitsofmeasure.org",
        code: units,
      };
    } else {
      obs.valueString = rawValue;
    }

    if (refRange) {
      obs.referenceRange = [{ text: refRange }];
    }

    return obs;
  });

  const bundle = {
    resourceType: "Bundle",
    type: "transaction",
    entry: observations.map((obs) => ({
      resource: obs,
      request: { method: "POST", url: "Observation" },
    })),
  };

  return {
    success: true,
    body: JSON.stringify(bundle),
    contentType: "application/fhir+json",
  };
}

function formatTimestamp(ts: string): string {
  if (!ts || ts.length < 8) return new Date().toISOString();
  const d = `${ts.slice(0, 4)}-${ts.slice(4, 6)}-${ts.slice(6, 8)}`;
  if (ts.length >= 12) return `${d}T${ts.slice(8, 10)}:${ts.slice(10, 12)}:00Z`;
  return `${d}T00:00:00Z`;
}