Kafka Source
Consume messages from Apache Kafka topics with configurable consumer groups, offsets, and authentication.
Overview
The Kafka source subscribes to one or more Kafka topics and consumes messages as they arrive. Each message is passed through the channel pipeline for validation and transformation before being forwarded to configured destinations.
This source integrates with existing Kafka infrastructure commonly found in healthcare data platforms, enabling intu to participate in event-driven architectures where clinical events, ADT feeds, or lab results are published to Kafka topics by upstream systems.
brokers is not specified in the listener config, intu falls back to the root-level kafka configuration in your project settings.
Configuration
listener:
type: kafka
kafka:
brokers:
- kafka-1.internal:9092
- kafka-2.internal:9092
topic: hl7-inbound
group_id: intu-adt-consumer
offset: earliest
auth:
mechanism: SASL_SSL
username: ${KAFKA_USER}
password: ${KAFKA_PASS}
tls:
ca_file: /etc/intu/certs/kafka-ca.pem
Properties
kafka.brokers configuration if omitted.earliest reads from the beginning; latest reads only new messages. Defaults to latest.PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and SASL_SSL.ca_file, cert_file, and key_file.Complete Example
Consume HL7 messages from a Kafka topic using SASL authentication and transform them for downstream processing.
id: kafka-hl7-consumer
enabled: true
listener:
type: kafka
kafka:
brokers:
- kafka-1.internal:9092
- kafka-2.internal:9092
- kafka-3.internal:9092
topic: hl7-inbound
group_id: intu-adt-consumer
offset: earliest
auth:
mechanism: SASL_SSL
username: ${KAFKA_USER}
password: ${KAFKA_PASS}
validator:
runtime: node
entrypoint: validator.ts
transformer:
runtime: node
entrypoint: transformer.ts
destinations:
- fhir-server
- audit-log
TypeScript Transformer Example
This transformer processes a JSON-encoded clinical event consumed from Kafka and converts it into a FHIR Encounter resource.
import { Message, TransformResult } from "@intu/sdk";
interface ClinicalEvent {
eventType: string;
patientId: string;
encounterId: string;
facilityCode: string;
admitDate: string;
dischargeDate?: string;
attendingPhysician: {
npi: string;
name: string;
};
diagnosis: {
code: string;
system: string;
description: string;
}[];
}
export default function transform(msg: Message): TransformResult {
let event: ClinicalEvent;
try {
event = JSON.parse(msg.body as string);
} catch {
return { success: false, error: "Invalid JSON payload from Kafka" };
}
const encounter = {
resourceType: "Encounter",
id: event.encounterId,
status: event.dischargeDate ? "finished" : "in-progress",
class: {
system: "http://terminology.hl7.org/CodeSystem/v3-ActCode",
code: "IMP",
display: "inpatient encounter",
},
subject: {
reference: `Patient/${event.patientId}`,
},
participant: [
{
individual: {
reference: `Practitioner/${event.attendingPhysician.npi}`,
display: event.attendingPhysician.name,
},
},
],
period: {
start: event.admitDate,
end: event.dischargeDate || undefined,
},
reasonCode: event.diagnosis.map((dx) => ({
coding: [
{
system: dx.system,
code: dx.code,
display: dx.description,
},
],
})),
serviceProvider: {
identifier: {
system: "urn:oid:2.16.840.1.113883.4.6",
value: event.facilityCode,
},
},
};
return {
success: true,
body: JSON.stringify(encounter),
contentType: "application/fhir+json",
};
}