Kafka Source

Consume messages from Apache Kafka topics with configurable consumer groups, offsets, and authentication.

Overview

The Kafka source subscribes to one or more Kafka topics and consumes messages as they arrive. Each message is passed through the channel pipeline for validation and transformation before being forwarded to configured destinations.

This source integrates with existing Kafka infrastructure commonly found in healthcare data platforms, enabling intu to participate in event-driven architectures where clinical events, ADT feeds, or lab results are published to Kafka topics by upstream systems.

Note If brokers is not specified in the listener config, intu falls back to the root-level kafka configuration in your project settings.

Configuration

yaml
listener:
  type: kafka
  kafka:
    brokers:
      - kafka-1.internal:9092
      - kafka-2.internal:9092
    topic: hl7-inbound
    group_id: intu-adt-consumer
    offset: earliest
    auth:
      mechanism: SASL_SSL
      username: ${KAFKA_USER}
      password: ${KAFKA_PASS}
    tls:
      ca_file: /etc/intu/certs/kafka-ca.pem

Properties

brokers string[] optional
List of Kafka broker addresses. Falls back to the root-level kafka.brokers configuration if omitted.
topic string required
The Kafka topic to consume from.
group_id string optional
Consumer group ID. When multiple intu instances share a group ID, Kafka distributes partitions among them for horizontal scaling.
offset string optional
Starting offset for new consumer groups. earliest reads from the beginning; latest reads only new messages. Defaults to latest.
auth object optional
Authentication configuration. Supports SASL mechanisms including PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and SASL_SSL.
tls object optional
TLS configuration for encrypted connections. Contains ca_file, cert_file, and key_file.

Complete Example

Consume HL7 messages from a Kafka topic using SASL authentication and transform them for downstream processing.

yaml
id: kafka-hl7-consumer
enabled: true

listener:
  type: kafka
  kafka:
    brokers:
      - kafka-1.internal:9092
      - kafka-2.internal:9092
      - kafka-3.internal:9092
    topic: hl7-inbound
    group_id: intu-adt-consumer
    offset: earliest
    auth:
      mechanism: SASL_SSL
      username: ${KAFKA_USER}
      password: ${KAFKA_PASS}

validator:
  runtime: node
  entrypoint: validator.ts

transformer:
  runtime: node
  entrypoint: transformer.ts

destinations:
  - fhir-server
  - audit-log

TypeScript Transformer Example

This transformer processes a JSON-encoded clinical event consumed from Kafka and converts it into a FHIR Encounter resource.

typescript
import { Message, TransformResult } from "@intu/sdk";

interface ClinicalEvent {
  eventType: string;
  patientId: string;
  encounterId: string;
  facilityCode: string;
  admitDate: string;
  dischargeDate?: string;
  attendingPhysician: {
    npi: string;
    name: string;
  };
  diagnosis: {
    code: string;
    system: string;
    description: string;
  }[];
}

export default function transform(msg: Message): TransformResult {
  let event: ClinicalEvent;
  try {
    event = JSON.parse(msg.body as string);
  } catch {
    return { success: false, error: "Invalid JSON payload from Kafka" };
  }

  const encounter = {
    resourceType: "Encounter",
    id: event.encounterId,
    status: event.dischargeDate ? "finished" : "in-progress",
    class: {
      system: "http://terminology.hl7.org/CodeSystem/v3-ActCode",
      code: "IMP",
      display: "inpatient encounter",
    },
    subject: {
      reference: `Patient/${event.patientId}`,
    },
    participant: [
      {
        individual: {
          reference: `Practitioner/${event.attendingPhysician.npi}`,
          display: event.attendingPhysician.name,
        },
      },
    ],
    period: {
      start: event.admitDate,
      end: event.dischargeDate || undefined,
    },
    reasonCode: event.diagnosis.map((dx) => ({
      coding: [
        {
          system: dx.system,
          code: dx.code,
          display: dx.description,
        },
      ],
    })),
    serviceProvider: {
      identifier: {
        system: "urn:oid:2.16.840.1.113883.4.6",
        value: event.facilityCode,
      },
    },
  };

  return {
    success: true,
    body: JSON.stringify(encounter),
    contentType: "application/fhir+json",
  };
}