File Source
Poll a directory for new files across local filesystem, FTP, S3, and SMB storage backends.
Overview
The File source watches a directory for new or modified files at a configurable poll interval. When a matching file is detected, its contents are read and passed into the channel pipeline. After successful processing, files can be moved to an archive directory; on failure, they are routed to an error directory.
This source supports four storage schemes: local for filesystem directories, ftp for FTP/FTPS servers, s3 for Amazon S3 buckets, and smb for Windows/Samba network shares. Each scheme has its own authentication and connection sub-properties.
Configuration
listener:
type: file
file:
scheme: local
directory: /data/incoming
file_pattern: "*.hl7"
poll_interval: 30s
move_to: /data/archive
error_dir: /data/errors
sort_by: date
Properties
local, ftp, s3, or smb.local, this is an absolute filesystem path. For remote schemes, this is the remote directory path.*.hl7, *.csv, LAB_*.txt. Defaults to * (all files).10s, 1m, 5m. Defaults to 30s.name, date, or size. Defaults to name.scheme is ftp. Contains host, port, username, password, and tls.scheme is s3. Contains bucket, region, prefix, and auth.scheme is smb. Contains host, share, username, password, and domain.S3 Sub-Properties
| Property | Type | Description |
|---|---|---|
bucket |
string |
S3 bucket name. |
region |
string |
AWS region (e.g. us-east-1). |
prefix |
string |
Key prefix to filter objects within the bucket. |
auth |
object |
Contains access_key_id and secret_access_key, or uses IAM role when omitted. |
Complete Example
Poll a local directory for HL7 files every 30 seconds, archive processed files, and route failures to an error directory.
id: lab-file-import
enabled: true
listener:
type: file
file:
scheme: local
directory: /data/incoming
file_pattern: "*.hl7"
poll_interval: 30s
move_to: /data/archive
error_dir: /data/errors
sort_by: date
validator:
runtime: node
entrypoint: validator.ts
transformer:
runtime: node
entrypoint: transformer.ts
destinations:
- lab-results-db
TypeScript Transformer Example
This transformer processes a batch file containing multiple HL7v2 messages separated by newlines. Each message is parsed and combined into a FHIR Bundle.
import { Message, TransformResult } from "@intu/sdk";
interface FHIRBundleEntry {
resource: {
resourceType: string;
code: { coding: { system: string; code: string; display: string }[] };
valueString?: string;
subject: { reference: string };
effectiveDateTime: string;
};
}
export default function transform(msg: Message): TransformResult {
const fileContent = msg.body as string;
const messages = fileContent.split("\n\n").filter(Boolean);
const entries: FHIRBundleEntry[] = [];
for (const raw of messages) {
const segments = raw.split("\r").filter(Boolean);
const obxSegments = segments.filter((s) => s.startsWith("OBX|"));
const pid = segments.find((s) => s.startsWith("PID|"));
if (!pid) continue;
const pidFields = pid.split("|");
const patientId = pidFields[3]?.split("^")[0] || "unknown";
for (const obx of obxSegments) {
const fields = obx.split("|");
entries.push({
resource: {
resourceType: "Observation",
code: {
coding: [
{
system: "http://loinc.org",
code: fields[3]?.split("^")[0] || "",
display: fields[3]?.split("^")[1] || "",
},
],
},
valueString: fields[5] || "",
subject: { reference: `Patient/${patientId}` },
effectiveDateTime: formatDateTime(fields[14]),
},
});
}
}
const bundle = {
resourceType: "Bundle",
type: "batch",
entry: entries.map((e) => ({ resource: e.resource })),
};
return {
success: true,
body: JSON.stringify(bundle),
contentType: "application/fhir+json",
};
}
function formatDateTime(dt: string | undefined): string {
if (!dt || dt.length < 8) return new Date().toISOString();
const date = `${dt.slice(0, 4)}-${dt.slice(4, 6)}-${dt.slice(6, 8)}`;
if (dt.length >= 12) {
return `${date}T${dt.slice(8, 10)}:${dt.slice(10, 12)}:00Z`;
}
return `${date}T00:00:00Z`;
}