Transformers
Transform messages with pure TypeScript functions
Overview
Transformers are the core of intu's processing pipeline. They are pure TypeScript functions that receive a message and a context object, then return the transformed message. The contract is simple: JSON in, JSON out.
Because transformers are plain functions with no side-effect requirements, they are easy to
unit test, compose, and reason about. intu compiles them with tsc so you get
full type safety during development.
Configuration
Transformers are declared in a channel's channel.yaml. There are two equivalent
ways to wire a transformer into the pipeline.
Explicit transformer block
transformer:
runtime: node
entrypoint: transformer.ts
Pipeline shorthand
pipeline:
transformer: transformer.ts
Function Signature
A transformer is a named export called transform. It receives the parsed message
and a context object, and must return the transformed message.
export function transform(
msg: unknown,
ctx: {
channelId: string;
correlationId: string;
messageId: string;
timestamp: string;
inboundDataType?: string;
outboundDataType?: string;
sourceType?: string;
}
): unknown {
// Return the transformed message
}
Context Object
The second argument passed to every transformer is a context object containing metadata about the current message and channel. Use it for tracing, routing, and conditional logic.
| Field | Type | Description |
|---|---|---|
channelId |
string |
The name of the channel processing this message |
correlationId |
string |
Unique correlation ID for distributed tracing across systems |
messageId |
string |
Unique identifier for this specific message |
timestamp |
string |
ISO 8601 timestamp of when the message was received |
inboundDataType |
string? |
Set when data_types.inbound is configured in the channel YAML |
outboundDataType |
string? |
Set when data_types.outbound is configured in the channel YAML |
sourceType |
string? |
The listener type that received the message (e.g. http, tcp, kafka) |
Examples
Basic Transform
Add processing metadata to every message:
export function transform(msg: unknown, ctx: { channelId: string; correlationId: string }): unknown {
return {
...(msg as object),
processedAt: new Date().toISOString(),
source: ctx.channelId,
};
}
HL7v2 to FHIR R4
Convert an HL7v2 ADT message into a FHIR R4 Patient resource:
export function transform(msg: unknown): unknown {
const hl7 = msg as Record<string, any>;
return {
resourceType: "Patient",
id: hl7.PID?.["3.1"],
identifier: [{
system: "urn:oid:2.16.840.1.113883.19.5",
value: hl7.PID?.["3.1"]
}],
name: [{
family: hl7.PID?.["5.1"],
given: [hl7.PID?.["5.2"]]
}],
gender: mapGender(hl7.PID?.["8"]),
birthDate: formatDate(hl7.PID?.["7.1"]),
address: [{
line: [hl7.PID?.["11.1"]],
city: hl7.PID?.["11.3"],
state: hl7.PID?.["11.4"],
postalCode: hl7.PID?.["11.5"]
}]
};
}
function mapGender(code: string): string {
const map: Record<string, string> = { M: "male", F: "female", O: "other", U: "unknown" };
return map[code] || "unknown";
}
function formatDate(hl7Date: string): string {
if (!hl7Date || hl7Date.length < 8) return "";
return `${hl7Date.slice(0, 4)}-${hl7Date.slice(4, 6)}-${hl7Date.slice(6, 8)}`;
}
CSV Lab Results to JSON
Map parsed CSV rows into a FHIR Observation bundle:
export function transform(msg: unknown): unknown {
const rows = msg as Array<Record<string, string>>;
return {
bundle: rows.map(row => ({
resourceType: "Observation",
status: "final",
code: {
coding: [{ system: "http://loinc.org", code: row.loincCode, display: row.testName }]
},
subject: { reference: `Patient/${row.patientId}` },
valueQuantity: {
value: parseFloat(row.value),
unit: row.unit,
system: "http://unitsofmeasure.org"
},
effectiveDateTime: row.collectionDate
}))
};
}
Message Routing
Use ctx._routeTo to dynamically route messages to specific destinations based
on message content:
export function transform(msg: unknown, ctx: any): unknown {
const m = msg as Record<string, any>;
const transformed = {
...m,
processedAt: new Date().toISOString(),
};
// Route critical results to both the main output and alerting
if (m.priority === "STAT" || m.abnormalFlag === "H") {
ctx._routeTo = ["kafka-output", "alert-http"];
} else {
ctx._routeTo = ["kafka-output"];
}
return transformed;
}
ctx._routeTo array overrides the default destination list for this message
only. Destination names must match the name field in the channel's
destinations configuration.
Using Shared Libraries
Import reusable utilities from the project's lib/ directory to keep
transformers focused and DRY:
import { formatTimestamp, generateId } from "../../lib/index";
export function transform(msg: unknown, ctx: { channelId: string; correlationId: string }): unknown {
return {
...(msg as object),
processedAt: formatTimestamp(new Date()),
internalId: generateId(),
source: ctx.channelId,
};
}
Pipeline
The transformer is one stage in a larger processing pipeline. Understanding the full order helps you decide where to place logic and how to structure your TypeScript files.
| Order | Stage | Description |
|---|---|---|
| 1 | Preprocessor | Runs on the raw payload before any parsing occurs |
| 2 | Parse | Automatic parsing based on data_types.inbound |
| 3 | Validator | Validates message structure; rejects invalid messages |
| 4 | Source Filter | Optionally drops messages before transformation |
| 5 | Transformer | Main transformation logic |
| 6 | Destination Filters | Per-destination message filtering |
| 7 | Destination Transformers | Per-destination message transformation |
| 8 | Send | Deliver the message to each destination |
| 9 | Postprocessor | Runs after all destinations have been processed |
Pipeline YAML
Declare all pipeline stages together in the channel's channel.yaml:
pipeline:
preprocessor: preprocess.ts
validator: validator.ts
source_filter: filter.ts
transformer: transformer.ts
postprocessor: postprocess.ts
Filter Functions
Source filters run after validation and before transformation. They decide whether a message
should continue through the pipeline or be silently dropped. Return true to
keep the message, false to drop it.
export function filter(msg: unknown, ctx: { channelId: string }): boolean {
const m = msg as Record<string, any>;
// Return true to keep, false to drop
return m.messageType === "ADT^A01";
}
Destination-level Transformers
When a channel has multiple destinations that require different output formats, you can configure per-destination filters and transformers. These run after the main transformer and allow each destination to receive a tailored payload.
destinations:
- name: kafka-output
ref: kafka-output
filter: dest-filter.ts
transformer: dest-transformer.ts
- name: alert-http
ref: alert-endpoint
transformer: alert-transformer.ts
The destination filter receives the output of the main transformer and can drop the message for that specific destination without affecting other destinations. The destination transformer receives the (possibly filtered) output and returns a destination-specific payload.
Compilation
Transformers, validators, filters, and all other TypeScript pipeline files must be compiled
before the project can be deployed. The intu build command handles this.
intu build --dir .
Under the hood, intu build runs npm run build, which invokes
tsc using the project's tsconfig.json. Compiled JavaScript files
are written to the dist/ directory.
intu validate --dir . before building to catch configuration errors early.
The validate command checks YAML structure and file references without compiling TypeScript.