Kafka Destination

Publish messages to Apache Kafka topics for event-driven healthcare data pipelines.

Overview

The Kafka destination publishes transformed messages to one or more Apache Kafka topics. Kafka is widely used in healthcare architectures to decouple producers from consumers, buffer high-throughput message streams, and enable real-time analytics. This destination supports SASL and TLS authentication, configurable partitioning, and message key extraction.

Configuration

Define a Kafka destination in your root intu.yaml under the destinations key:

yaml
# intu.yaml
destinations:
  hl7-events:
    type: kafka
    kafka:
      brokers:
        - kafka-1.hospital.internal:9092
        - kafka-2.hospital.internal:9092
        - kafka-3.hospital.internal:9092
      topic: healthcare.hl7.adt-events
      client_id: intu-adt-producer
      auth:
        type: sasl_plain
        username: ${KAFKA_USER}
        password: ${KAFKA_PASS}
      tls:
        enabled: true
        ca_cert: /etc/ssl/certs/kafka-ca.pem

Properties

brokers string[] required
List of Kafka broker addresses in host:port format. At least one broker is required; multiple brokers provide failover.
topic string required
The Kafka topic to publish messages to. Supports environment variable interpolation.
client_id string optional
An identifier for this producer, used in Kafka broker logs and monitoring. Defaults to intu-producer.
key string optional
Expression to extract the message key for partitioning. Supports ${field} interpolation from message headers or body. Messages with the same key are routed to the same partition.
compression string optional
Compression codec for produced messages. One of: none, gzip, snappy, lz4, zstd. Defaults to none.

Authentication

auth.type string optional
Authentication mechanism. One of: sasl_plain, sasl_scram_256, sasl_scram_512. Omit for unauthenticated connections.
auth.username string optional
SASL username.
auth.password string optional
SASL password. Use ${VAR} to reference environment variables.

TLS

tls.enabled bool optional
Enable TLS for broker connections. Defaults to false.
tls.ca_cert string optional
Path to a PEM-encoded CA certificate for verifying broker certificates.
tls.client_cert string optional
Path to a client certificate for mutual TLS.
tls.client_key string optional
Path to the client private key for mutual TLS.

Full Example

Receive HL7v2 ADT messages via MLLP, transform them to a normalized JSON event format, and publish to a Kafka topic partitioned by patient ID.

Root Configuration

yaml
# intu.yaml
destinations:
  adt-event-stream:
    type: kafka
    kafka:
      brokers:
        - kafka-1.hospital.internal:9092
        - kafka-2.hospital.internal:9092
      topic: healthcare.events.adt
      client_id: intu-adt-publisher
      key: ${patientId}
      compression: snappy
      auth:
        type: sasl_scram_256
        username: ${KAFKA_USER}
        password: ${KAFKA_PASS}
      tls:
        enabled: true
        ca_cert: /etc/ssl/certs/kafka-ca.pem

Channel Configuration

yaml
# channels/adt-events/channel.yaml
name: adt-events
description: Normalize HL7v2 ADT messages and publish to Kafka

source:
  type: tcp
  tcp:
    port: 6661
    mode: mllp

destinations:
  - adt-event-stream

Destination Transformer

typescript
// channels/adt-events/transformer.ts
import { Message, Context } from "@intu/sdk";

interface AdtEvent {
  eventType: string;
  patientId: string;
  mrn: string;
  timestamp: string;
  patient: {
    familyName: string;
    givenName: string;
    dateOfBirth: string;
    gender: string;
  };
  visit?: {
    visitNumber: string;
    admitDate: string;
    department: string;
  };
}

export default function transform(msg: Message, ctx: Context): Message {
  const hl7 = msg.body;
  const msh = hl7.segments?.find((s: any) => s.name === "MSH");
  const pid = hl7.segments?.find((s: any) => s.name === "PID");
  const pv1 = hl7.segments?.find((s: any) => s.name === "PV1");

  if (!msh || !pid) {
    throw new Error("Missing required MSH or PID segment");
  }

  const event: AdtEvent = {
    eventType: msh.fields[8]?.components[1] ?? "A01",
    patientId: pid.fields[3]?.components[0] ?? "",
    mrn: pid.fields[2]?.components[0] ?? "",
    timestamp: new Date().toISOString(),
    patient: {
      familyName: pid.fields[5]?.components[0] ?? "",
      givenName: pid.fields[5]?.components[1] ?? "",
      dateOfBirth: pid.fields[7]?.components[0] ?? "",
      gender: pid.fields[8]?.components[0] ?? "",
    },
  };

  if (pv1) {
    event.visit = {
      visitNumber: pv1.fields[19]?.components[0] ?? "",
      admitDate: pv1.fields[44]?.components[0] ?? "",
      department: pv1.fields[3]?.components[0] ?? "",
    };
  }

  return {
    ...msg,
    body: event,
    headers: {
      ...msg.headers,
      patientId: event.patientId,
    },
  };
}
Note Messages published to Kafka are serialized as JSON by default. The key property determines partition assignment -- messages with the same key always land on the same partition, preserving ordering per patient.