File Source

Poll a directory for new files across local filesystem, FTP, S3, and SMB storage backends.

Overview

The File source watches a directory for new or modified files at a configurable poll interval. When a matching file is detected, its contents are read and passed into the channel pipeline. After successful processing, files can be moved to an archive directory; on failure, they are routed to an error directory.

This source supports four storage schemes: local for filesystem directories, ftp for FTP/FTPS servers, s3 for Amazon S3 buckets, and smb for Windows/Samba network shares. Each scheme has its own authentication and connection sub-properties.

Note File-based integrations are common in batch processing scenarios such as nightly lab result imports, insurance claim file drops, and radiology report exports.

Configuration

yaml
listener:
  type: file
  file:
    scheme: local
    directory: /data/incoming
    file_pattern: "*.hl7"
    poll_interval: 30s
    move_to: /data/archive
    error_dir: /data/errors
    sort_by: date

Properties

scheme string required
Storage backend. One of local, ftp, s3, or smb.
directory string required
Path to the directory to watch. For local, this is an absolute filesystem path. For remote schemes, this is the remote directory path.
file_pattern string optional
Glob pattern to filter files. Examples: *.hl7, *.csv, LAB_*.txt. Defaults to * (all files).
poll_interval string optional
How often to check for new files. Accepts Go duration strings such as 10s, 1m, 5m. Defaults to 30s.
move_to string optional
Directory to move files after successful processing. If not set, processed files remain in the source directory.
error_dir string optional
Directory to move files that fail processing. Useful for manual review and reprocessing workflows.
sort_by string optional
Determines the order files are processed. One of name, date, or size. Defaults to name.
ftp object optional
FTP connection settings. Required when scheme is ftp. Contains host, port, username, password, and tls.
s3 object optional
S3 connection settings. Required when scheme is s3. Contains bucket, region, prefix, and auth.
smb object optional
SMB/CIFS connection settings. Required when scheme is smb. Contains host, share, username, password, and domain.

S3 Sub-Properties

Property Type Description
bucket string S3 bucket name.
region string AWS region (e.g. us-east-1).
prefix string Key prefix to filter objects within the bucket.
auth object Contains access_key_id and secret_access_key, or uses IAM role when omitted.

Complete Example

Poll a local directory for HL7 files every 30 seconds, archive processed files, and route failures to an error directory.

yaml
id: lab-file-import
enabled: true

listener:
  type: file
  file:
    scheme: local
    directory: /data/incoming
    file_pattern: "*.hl7"
    poll_interval: 30s
    move_to: /data/archive
    error_dir: /data/errors
    sort_by: date

validator:
  runtime: node
  entrypoint: validator.ts

transformer:
  runtime: node
  entrypoint: transformer.ts

destinations:
  - lab-results-db

TypeScript Transformer Example

This transformer processes a batch file containing multiple HL7v2 messages separated by newlines. Each message is parsed and combined into a FHIR Bundle.

typescript
import { Message, TransformResult } from "@intu/sdk";

interface FHIRBundleEntry {
  resource: {
    resourceType: string;
    code: { coding: { system: string; code: string; display: string }[] };
    valueString?: string;
    subject: { reference: string };
    effectiveDateTime: string;
  };
}

export default function transform(msg: Message): TransformResult {
  const fileContent = msg.body as string;
  const messages = fileContent.split("\n\n").filter(Boolean);
  const entries: FHIRBundleEntry[] = [];

  for (const raw of messages) {
    const segments = raw.split("\r").filter(Boolean);
    const obxSegments = segments.filter((s) => s.startsWith("OBX|"));
    const pid = segments.find((s) => s.startsWith("PID|"));

    if (!pid) continue;

    const pidFields = pid.split("|");
    const patientId = pidFields[3]?.split("^")[0] || "unknown";

    for (const obx of obxSegments) {
      const fields = obx.split("|");
      entries.push({
        resource: {
          resourceType: "Observation",
          code: {
            coding: [
              {
                system: "http://loinc.org",
                code: fields[3]?.split("^")[0] || "",
                display: fields[3]?.split("^")[1] || "",
              },
            ],
          },
          valueString: fields[5] || "",
          subject: { reference: `Patient/${patientId}` },
          effectiveDateTime: formatDateTime(fields[14]),
        },
      });
    }
  }

  const bundle = {
    resourceType: "Bundle",
    type: "batch",
    entry: entries.map((e) => ({ resource: e.resource })),
  };

  return {
    success: true,
    body: JSON.stringify(bundle),
    contentType: "application/fhir+json",
  };
}

function formatDateTime(dt: string | undefined): string {
  if (!dt || dt.length < 8) return new Date().toISOString();
  const date = `${dt.slice(0, 4)}-${dt.slice(4, 6)}-${dt.slice(6, 8)}`;
  if (dt.length >= 12) {
    return `${date}T${dt.slice(8, 10)}:${dt.slice(10, 12)}:00Z`;
  }
  return `${date}T00:00:00Z`;
}