Kafka Destination
Publish messages to Apache Kafka topics for event-driven healthcare data pipelines.
Overview
The Kafka destination publishes transformed messages to one or more Apache Kafka topics. Kafka is widely used in healthcare architectures to decouple producers from consumers, buffer high-throughput message streams, and enable real-time analytics. This destination supports SASL and TLS authentication, configurable partitioning, and message key extraction.
Configuration
Define a Kafka destination in your root intu.yaml under the destinations key:
yaml
# intu.yaml
destinations:
hl7-events:
type: kafka
kafka:
brokers:
- kafka-1.hospital.internal:9092
- kafka-2.hospital.internal:9092
- kafka-3.hospital.internal:9092
topic: healthcare.hl7.adt-events
client_id: intu-adt-producer
auth:
type: sasl_plain
username: ${KAFKA_USER}
password: ${KAFKA_PASS}
tls:
enabled: true
ca_cert: /etc/ssl/certs/kafka-ca.pem
Properties
brokers
string[]
required
List of Kafka broker addresses in
host:port format. At least one broker is required; multiple brokers provide failover.
topic
string
required
The Kafka topic to publish messages to. Supports environment variable interpolation.
client_id
string
optional
An identifier for this producer, used in Kafka broker logs and monitoring. Defaults to
intu-producer.
key
string
optional
Expression to extract the message key for partitioning. Supports
${field} interpolation from message headers or body. Messages with the same key are routed to the same partition.
compression
string
optional
Compression codec for produced messages. One of:
none, gzip, snappy, lz4, zstd. Defaults to none.Authentication
auth.type
string
optional
Authentication mechanism. One of:
sasl_plain, sasl_scram_256, sasl_scram_512. Omit for unauthenticated connections.
auth.username
string
optional
SASL username.
auth.password
string
optional
SASL password. Use
${VAR} to reference environment variables.TLS
tls.enabled
bool
optional
Enable TLS for broker connections. Defaults to
false.
tls.ca_cert
string
optional
Path to a PEM-encoded CA certificate for verifying broker certificates.
tls.client_cert
string
optional
Path to a client certificate for mutual TLS.
tls.client_key
string
optional
Path to the client private key for mutual TLS.
Full Example
Receive HL7v2 ADT messages via MLLP, transform them to a normalized JSON event format, and publish to a Kafka topic partitioned by patient ID.
Root Configuration
yaml
# intu.yaml
destinations:
adt-event-stream:
type: kafka
kafka:
brokers:
- kafka-1.hospital.internal:9092
- kafka-2.hospital.internal:9092
topic: healthcare.events.adt
client_id: intu-adt-publisher
key: ${patientId}
compression: snappy
auth:
type: sasl_scram_256
username: ${KAFKA_USER}
password: ${KAFKA_PASS}
tls:
enabled: true
ca_cert: /etc/ssl/certs/kafka-ca.pem
Channel Configuration
yaml
# channels/adt-events/channel.yaml
name: adt-events
description: Normalize HL7v2 ADT messages and publish to Kafka
source:
type: tcp
tcp:
port: 6661
mode: mllp
destinations:
- adt-event-stream
Destination Transformer
typescript
// channels/adt-events/transformer.ts
import { Message, Context } from "@intu/sdk";
interface AdtEvent {
eventType: string;
patientId: string;
mrn: string;
timestamp: string;
patient: {
familyName: string;
givenName: string;
dateOfBirth: string;
gender: string;
};
visit?: {
visitNumber: string;
admitDate: string;
department: string;
};
}
export default function transform(msg: Message, ctx: Context): Message {
const hl7 = msg.body;
const msh = hl7.segments?.find((s: any) => s.name === "MSH");
const pid = hl7.segments?.find((s: any) => s.name === "PID");
const pv1 = hl7.segments?.find((s: any) => s.name === "PV1");
if (!msh || !pid) {
throw new Error("Missing required MSH or PID segment");
}
const event: AdtEvent = {
eventType: msh.fields[8]?.components[1] ?? "A01",
patientId: pid.fields[3]?.components[0] ?? "",
mrn: pid.fields[2]?.components[0] ?? "",
timestamp: new Date().toISOString(),
patient: {
familyName: pid.fields[5]?.components[0] ?? "",
givenName: pid.fields[5]?.components[1] ?? "",
dateOfBirth: pid.fields[7]?.components[0] ?? "",
gender: pid.fields[8]?.components[0] ?? "",
},
};
if (pv1) {
event.visit = {
visitNumber: pv1.fields[19]?.components[0] ?? "",
admitDate: pv1.fields[44]?.components[0] ?? "",
department: pv1.fields[3]?.components[0] ?? "",
};
}
return {
...msg,
body: event,
headers: {
...msg.headers,
patientId: event.patientId,
},
};
}
Note
Messages published to Kafka are serialized as JSON by default. The
key property
determines partition assignment -- messages with the same key always land on the same partition,
preserving ordering per patient.