Transformers

Transform messages with pure TypeScript functions

Overview

Transformers are the core of intu's processing pipeline. They are pure TypeScript functions that receive a message and a context object, then return the transformed message. The contract is simple: JSON in, JSON out.

Because transformers are plain functions with no side-effect requirements, they are easy to unit test, compose, and reason about. intu compiles them with tsc so you get full type safety during development.

Configuration

Transformers are declared in a channel's channel.yaml. There are two equivalent ways to wire a transformer into the pipeline.

Explicit transformer block

yaml
transformer:
  runtime: node
  entrypoint: transformer.ts

Pipeline shorthand

yaml
pipeline:
  transformer: transformer.ts
Note Both forms are functionally identical. The pipeline shorthand is more concise when you have multiple pipeline stages declared together.

Function Signature

A transformer is a named export called transform. It receives the parsed message and a context object, and must return the transformed message.

typescript
export function transform(
  msg: unknown,
  ctx: {
    channelId: string;
    correlationId: string;
    messageId: string;
    timestamp: string;
    inboundDataType?: string;
    outboundDataType?: string;
    sourceType?: string;
  }
): unknown {
  // Return the transformed message
}

Context Object

The second argument passed to every transformer is a context object containing metadata about the current message and channel. Use it for tracing, routing, and conditional logic.

Field Type Description
channelId string The name of the channel processing this message
correlationId string Unique correlation ID for distributed tracing across systems
messageId string Unique identifier for this specific message
timestamp string ISO 8601 timestamp of when the message was received
inboundDataType string? Set when data_types.inbound is configured in the channel YAML
outboundDataType string? Set when data_types.outbound is configured in the channel YAML
sourceType string? The listener type that received the message (e.g. http, tcp, kafka)

Examples

Basic Transform

Add processing metadata to every message:

typescript
export function transform(msg: unknown, ctx: { channelId: string; correlationId: string }): unknown {
  return {
    ...(msg as object),
    processedAt: new Date().toISOString(),
    source: ctx.channelId,
  };
}

HL7v2 to FHIR R4

Convert an HL7v2 ADT message into a FHIR R4 Patient resource:

typescript
export function transform(msg: unknown): unknown {
  const hl7 = msg as Record<string, any>;

  return {
    resourceType: "Patient",
    id: hl7.PID?.["3.1"],
    identifier: [{
      system: "urn:oid:2.16.840.1.113883.19.5",
      value: hl7.PID?.["3.1"]
    }],
    name: [{
      family: hl7.PID?.["5.1"],
      given: [hl7.PID?.["5.2"]]
    }],
    gender: mapGender(hl7.PID?.["8"]),
    birthDate: formatDate(hl7.PID?.["7.1"]),
    address: [{
      line: [hl7.PID?.["11.1"]],
      city: hl7.PID?.["11.3"],
      state: hl7.PID?.["11.4"],
      postalCode: hl7.PID?.["11.5"]
    }]
  };
}

function mapGender(code: string): string {
  const map: Record<string, string> = { M: "male", F: "female", O: "other", U: "unknown" };
  return map[code] || "unknown";
}

function formatDate(hl7Date: string): string {
  if (!hl7Date || hl7Date.length < 8) return "";
  return `${hl7Date.slice(0, 4)}-${hl7Date.slice(4, 6)}-${hl7Date.slice(6, 8)}`;
}

CSV Lab Results to JSON

Map parsed CSV rows into a FHIR Observation bundle:

typescript
export function transform(msg: unknown): unknown {
  const rows = msg as Array<Record<string, string>>;

  return {
    bundle: rows.map(row => ({
      resourceType: "Observation",
      status: "final",
      code: {
        coding: [{ system: "http://loinc.org", code: row.loincCode, display: row.testName }]
      },
      subject: { reference: `Patient/${row.patientId}` },
      valueQuantity: {
        value: parseFloat(row.value),
        unit: row.unit,
        system: "http://unitsofmeasure.org"
      },
      effectiveDateTime: row.collectionDate
    }))
  };
}

Message Routing

Use ctx._routeTo to dynamically route messages to specific destinations based on message content:

typescript
export function transform(msg: unknown, ctx: any): unknown {
  const m = msg as Record<string, any>;
  const transformed = {
    ...m,
    processedAt: new Date().toISOString(),
  };

  // Route critical results to both the main output and alerting
  if (m.priority === "STAT" || m.abnormalFlag === "H") {
    ctx._routeTo = ["kafka-output", "alert-http"];
  } else {
    ctx._routeTo = ["kafka-output"];
  }

  return transformed;
}
Tip The ctx._routeTo array overrides the default destination list for this message only. Destination names must match the name field in the channel's destinations configuration.

Using Shared Libraries

Import reusable utilities from the project's lib/ directory to keep transformers focused and DRY:

typescript
import { formatTimestamp, generateId } from "../../lib/index";

export function transform(msg: unknown, ctx: { channelId: string; correlationId: string }): unknown {
  return {
    ...(msg as object),
    processedAt: formatTimestamp(new Date()),
    internalId: generateId(),
    source: ctx.channelId,
  };
}

Pipeline

The transformer is one stage in a larger processing pipeline. Understanding the full order helps you decide where to place logic and how to structure your TypeScript files.

Order Stage Description
1 Preprocessor Runs on the raw payload before any parsing occurs
2 Parse Automatic parsing based on data_types.inbound
3 Validator Validates message structure; rejects invalid messages
4 Source Filter Optionally drops messages before transformation
5 Transformer Main transformation logic
6 Destination Filters Per-destination message filtering
7 Destination Transformers Per-destination message transformation
8 Send Deliver the message to each destination
9 Postprocessor Runs after all destinations have been processed

Pipeline YAML

Declare all pipeline stages together in the channel's channel.yaml:

yaml
pipeline:
  preprocessor: preprocess.ts
  validator: validator.ts
  source_filter: filter.ts
  transformer: transformer.ts
  postprocessor: postprocess.ts

Filter Functions

Source filters run after validation and before transformation. They decide whether a message should continue through the pipeline or be silently dropped. Return true to keep the message, false to drop it.

typescript
export function filter(msg: unknown, ctx: { channelId: string }): boolean {
  const m = msg as Record<string, any>;
  // Return true to keep, false to drop
  return m.messageType === "ADT^A01";
}
Note Dropped messages are not sent to the dead-letter queue. If you need to track rejected messages, use a validator instead of a filter.

Destination-level Transformers

When a channel has multiple destinations that require different output formats, you can configure per-destination filters and transformers. These run after the main transformer and allow each destination to receive a tailored payload.

yaml
destinations:
  - name: kafka-output
    ref: kafka-output
    filter: dest-filter.ts
    transformer: dest-transformer.ts

  - name: alert-http
    ref: alert-endpoint
    transformer: alert-transformer.ts

The destination filter receives the output of the main transformer and can drop the message for that specific destination without affecting other destinations. The destination transformer receives the (possibly filtered) output and returns a destination-specific payload.

Compilation

Transformers, validators, filters, and all other TypeScript pipeline files must be compiled before the project can be deployed. The intu build command handles this.

bash
intu build --dir .

Under the hood, intu build runs npm run build, which invokes tsc using the project's tsconfig.json. Compiled JavaScript files are written to the dist/ directory.

Tip Run intu validate --dir . before building to catch configuration errors early. The validate command checks YAML structure and file references without compiling TypeScript.