Batch processing has its place, but when you need sub-second reaction times, you need a streaming pipeline. I’ve been building event-driven systems with TypeScript and Kafka that handle millions of events per day, and the combination works better than most people expect. TypeScript gives you type safety across your event schemas, and Kafka gives you the durability and throughput. This post covers the implementation patterns I use.
Why TypeScript for Kafka pipelines
The conventional wisdom is that Kafka consumers should be written in Java or Go. I disagree for most use cases. TypeScript with KafkaJS gives you:
- Type-safe event schemas — your producer and consumer agree on the shape of every event at compile time.
- Shared types — if your API layer is already TypeScript, you reuse the same event types end-to-end.
- Fast iteration — the feedback loop from change to running consumer is seconds, not minutes.
KafkaJS is mature, well-maintained, and handles the Kafka protocol correctly. I’ve run it at tens of thousands of messages per second per consumer without issues. If you need millions per second per consumer, use Java. For everything else, TypeScript is the right call.
Project setup
mkdir kafka-pipeline && cd kafka-pipeline
npm init -y
npm install kafkajs ajv ajv-formats secure-json-parse uuid prom-client
npm install -D typescript @types/node
npx tsc --init
Type-safe event schemas
The foundation of a reliable pipeline is strict event schemas. I define every event as a TypeScript type and validate at the boundary:
// src/events.ts
export interface BaseEvent {
id: string;
timestamp: string;
type: string;
}
export interface OrderCreated extends BaseEvent {
type: "ORDER_CREATED";
data: {
orderId: string;
userId: string;
products: Array<{ productId: string; quantity: number; price: number }>;
totalAmount: number;
};
}
export interface PaymentCompleted extends BaseEvent {
type: "PAYMENT_COMPLETED";
data: {
paymentId: string;
orderId: string;
amount: number;
};
}
export type DomainEvent = OrderCreated | PaymentCompleted;
For runtime validation, I use Ajv. The schemas mirror the TypeScript types:
// src/validation.ts
import Ajv, { JSONSchemaType } from "ajv";
import addFormats from "ajv-formats";
import type { OrderCreated } from "./events";
// addFormats wires up "date-time", "uuid", "email", etc. Without it, Ajv
// silently accepts any string where the schema says format: "date-time".
const ajv = addFormats(new Ajv({ allErrors: true }));
const orderCreatedSchema: JSONSchemaType<OrderCreated> = {
type: "object",
properties: {
id: { type: "string" },
timestamp: { type: "string", format: "date-time" },
type: { type: "string", const: "ORDER_CREATED" },
data: {
type: "object",
properties: {
orderId: { type: "string" },
userId: { type: "string" },
products: {
type: "array",
items: {
type: "object",
properties: {
productId: { type: "string" },
quantity: { type: "number", minimum: 1 },
price: { type: "number", minimum: 0 },
},
required: ["productId", "quantity", "price"],
additionalProperties: false,
},
},
totalAmount: { type: "number", minimum: 0 },
},
required: ["orderId", "userId", "products", "totalAmount"],
additionalProperties: false,
},
},
required: ["id", "timestamp", "type", "data"],
additionalProperties: false,
};
export const validateOrderCreated = ajv.compile(orderCreatedSchema);
This is more work upfront than just JSON.parse and hoping for the best. It’s worth it. A single malformed event in a pipeline can cascade into data corruption across downstream consumers. Validate at the boundary, trust the types internally.
Securing the Kafka client
Before any producer or consumer code, one thing the quickstarts skip: a production Kafka client without TLS and SASL is a breach waiting to happen. A plain brokers: ["localhost:9092"] connection sends message bytes and auth credentials in cleartext. That’s fine for a local dev loop; it is not fine the moment the broker is reachable from anything outside your laptop. Configure ssl and sasl from day one — don’t bolt it on during a security review.
// src/kafka-client.ts
import { Kafka, logLevel } from "kafkajs";
// Required env: KAFKA_BROKERS, KAFKA_USERNAME, KAFKA_PASSWORD.
// Fail fast if they're missing -- don't silently fall back to plaintext.
function requireEnv(name: string): string {
const v = process.env[name];
if (!v) throw new Error(`${name} is required`);
return v;
}
export const kafka = new Kafka({
clientId: "order-service",
brokers: requireEnv("KAFKA_BROKERS").split(","),
ssl: true,
sasl: {
mechanism: "scram-sha-512",
username: requireEnv("KAFKA_USERNAME"),
password: requireEnv("KAFKA_PASSWORD"),
},
logLevel: logLevel.WARN,
});
SCRAM-SHA-512 is the right default for username/password auth against modern brokers (MSK, Confluent Cloud, Aiven, Redpanda all support it). If you’re on mTLS, drop the sasl block and pass ssl: { rejectUnauthorized: true, ca, cert, key } instead — where ca is the broker’s issuing CA (so the client can verify the server), and cert/key are the client’s certificate and private key (so the broker can verify the client). Mixing those up — passing the client CA as ca, or the server cert as cert — is the most common mTLS misconfiguration I see. Never set ssl: { rejectUnauthorized: false } outside a debugging shell.
The producer
// src/producer.ts
import { Producer } from "kafkajs";
import { v4 as uuidv4 } from "uuid";
import { kafka } from "./kafka-client";
import type { DomainEvent } from "./events";
export class EventProducer {
private producer: Producer;
private connected = false;
constructor() {
// idempotent: true enables producer-side dedupe (PID + sequence number)
// so broker-side retries don't create duplicates. It's free — enable it.
this.producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 });
}
async connect(): Promise<void> {
if (!this.connected) {
await this.producer.connect();
this.connected = true;
}
}
async publish(topic: string, event: DomainEvent): Promise<void> {
if (!this.connected) await this.connect();
await this.producer.send({
topic,
messages: [
{
key: event.id,
value: JSON.stringify(event),
headers: {
"event-type": event.type,
"content-type": "application/json",
},
},
],
});
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
this.connected = false;
}
}
// Usage
async function main() {
const producer = new EventProducer();
await producer.connect();
await producer.publish("order-events", {
id: uuidv4(),
timestamp: new Date().toISOString(),
type: "ORDER_CREATED",
data: {
orderId: uuidv4(),
userId: "user-123",
products: [{ productId: "prod-1", quantity: 2, price: 29.99 }],
totalAmount: 59.98,
},
});
await producer.disconnect();
}
The event ID is the Kafka message key. This means all events for the same logical entity land on the same partition, which gives you ordering guarantees within that entity. If you need ordering by order ID instead, use event.data.orderId as the key.
The consumer with dead letter queue
Here’s where most tutorials stop at “subscribe and log.” A production consumer needs schema validation at the boundary, size limits, idempotency, a dead letter queue with correlation metadata, and deliberate offset-commit semantics. Each of those is a lesson learned the expensive way.
Before the code, a few decisions the consumer is making:
- Manual offset commits, not auto-commit. Auto-commit acknowledges messages on a timer regardless of whether your handler succeeded — that’s commit-before-process, and it silently drops data when the handler crashes between commit and completion. Manual commit after successful processing gives you at-least-once semantics.
- Validate the payload before handing it to a handler. The types are a compile-time fiction; a malformed producer or an attacker on an internal topic will send you whatever they want. Validate at the trust boundary.
- Size-cap the payload before
JSON.parse, and use a hardened parser. A 50 MB JSON blob will pin a Node event loop for seconds and can OOM a small pod — that’s the first line of defense. The second is the parser itself:JSON.parsehappily accepts deeply nested structures (algorithmic DoS under the byte cap) and keys like__proto__/constructorthat pollute object prototypes. Use a depth-limited parser likesecure-json-parsewithprotoAction: 'remove'andconstructorAction: 'remove'to neutralize both. - Dedupe by
event.idwith a bounded set, whereevent.idMUST be cryptographically unique per event (UUIDv4, ULID, KSUID) — never a business key likeorderId. At-least-once means your handler will see duplicates during rebalances and retries; handlers must be idempotent, and a first-line dedupe set catches the common case cheaply. If you key dedupe on a business ID, legitimate events that happen to share that ID get silently dropped. - DLQ messages carry correlation headers (
source-topic,source-partition,source-offset,source-event-id,retry-count) so an operator replaying from the DLQ can trace back to the originating position and cap replay loops.
// src/consumer.ts
import { Consumer, Producer, EachMessagePayload, IHeaders } from "kafkajs";
import sjson from "secure-json-parse";
import { Counter } from "prom-client";
import { kafka } from "./kafka-client";
import type { DomainEvent } from "./events";
import { validateEvent } from "./validation";
// Dedicated metric so a spike in records that exhausted MAX_RETRY_COUNT is
// visible in Grafana, not buried in log output.
const poisonRecordsTotal = new Counter({
name: "kafka_poison_records_total",
help: "Records that exceeded MAX_RETRY_COUNT and were quarantined",
labelNames: ["topic", "event_type"] as const,
});
type EventHandler = (event: DomainEvent) => Promise<void>;
// Hard cap on individual message size we're willing to parse. Kafka brokers
// default message.max.bytes to 1 MB; pick a consumer-side limit that matches
// your contract, not whatever the broker happens to allow today.
// This is the FIRST line of defense only — `secure-json-parse` below handles
// prototype pollution, and you should still consider a depth-limited parser
// or streaming parser if your schema permits deeply nested structures.
const MAX_MESSAGE_BYTES = 1_000_000;
// Bounded dedupe set. event.id MUST be cryptographically unique per event
// (UUIDv4 / ULID / KSUID) — never a business key like orderId, or colliding
// legitimate events get silently dropped.
// The set is only safe with partitionsConsumedConcurrently: 1 (KafkaJS default)
// and a single consumer replica. For higher concurrency or multi-replica
// deploys, replace this with Redis `SET key NX PX <ttl>` as the atomic
// primitive — the in-memory has→add sequence is a race otherwise.
const DEDUPE_LIMIT = 100_000;
// Cap DLQ replay loops: a message that has already bounced N times is broken
// and should be human-triaged, not re-ingested.
const MAX_RETRY_COUNT = 5;
export class EventConsumer {
private consumer: Consumer;
private dlqProducer: Producer;
private handlers = new Map<string, EventHandler[]>();
private recentIds = new Set<string>();
constructor(groupId: string, private dlqTopic: string) {
this.consumer = kafka.consumer({
groupId,
maxWaitTimeInMs: 500,
// sessionTimeout must exceed worst-case handler latency + poll interval,
// or the broker will consider this consumer dead and trigger a rebalance
// mid-message. heartbeatInterval should be ~1/3 of sessionTimeout.
// Defaults (30s / 3s) assume sub-10s handlers; tune up for slower work.
sessionTimeout: 30_000,
heartbeatInterval: 3_000,
// retry controls KafkaJS's own reconnect backoff, not handler retries.
retry: { retries: 5 },
});
this.dlqProducer = kafka.producer({ idempotent: true });
}
on(eventType: string, handler: EventHandler): void {
const existing = this.handlers.get(eventType) ?? [];
existing.push(handler);
this.handlers.set(eventType, existing);
}
async start(topics: string[]): Promise<void> {
await this.consumer.connect();
await this.dlqProducer.connect();
for (const topic of topics) {
// fromBeginning: false means a brand-new consumer group starts at the
// current log-end offset and SKIPS any backlog. That's the right default
// for a new streaming service joining a live topic; it's the WRONG
// default for a replay job or a service that needs historical state.
// Flip to true (or use a tool like kafka-consumer-groups --reset-offsets)
// when you actually need the backlog.
await this.consumer.subscribe({ topic, fromBeginning: false });
}
await this.consumer.run({
// Disable auto-commit; we commit explicitly after the handler succeeds.
autoCommit: false,
eachMessage: async (payload) => this.processMessage(payload),
});
}
private async processMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
// 1. Size + presence guard BEFORE toString/parse.
// An empty message.value is a tombstone or a producer bug. We DLQ+commit
// rather than `return`, because a bare return without commit permanently
// stalls this partition at the first tombstone.
if (!message.value || message.value.length > MAX_MESSAGE_BYTES) {
await this.sendToDLQ(payload, "oversized or empty payload", 0);
await this.commit(payload);
return;
}
// 2. Parse with a hardened parser. secure-json-parse strips __proto__
// and constructor keys so a malicious producer can't pollute prototypes
// on this consumer. It still won't enforce a depth limit — pair with
// a schema validator (step 3) that rejects unexpected shapes.
let parsed: unknown;
try {
parsed = sjson.parse(message.value.toString("utf8"), {
protoAction: "remove",
constructorAction: "remove",
});
} catch {
await this.sendToDLQ(payload, "invalid JSON", 0);
await this.commit(payload);
return;
}
// 3. Validate at the boundary. The types are a lie until Ajv says so.
if (!validateEvent(parsed)) {
const errs = validateEvent.errors?.map((e) => e.message).join("; ");
await this.sendToDLQ(payload, `schema violation: ${errs ?? "unknown"}`, 0);
await this.commit(payload);
return;
}
const event = parsed as DomainEvent;
// 4. Dedupe. At-least-once means we WILL see the same event.id twice.
// Real LRU: on a hit, refresh recency by delete+re-add before committing,
// so a hot id doesn't get evicted while it's still actively seen.
if (this.recentIds.has(event.id)) {
this.recentIds.delete(event.id);
this.recentIds.add(event.id);
await this.commit(payload);
return;
}
// 5. Dispatch. If any handler throws, DLQ the message and move on --
// blocking the partition on a single bad record is how backlogs are born.
const handlers = this.handlers.get(event.type) ?? [];
for (const handler of handlers) {
try {
await handler(event);
} catch (err) {
const msg = err instanceof Error ? err.message : "unknown error";
console.error(`Handler failed for ${event.type} ${event.id}: ${msg}`);
const retryCount = readRetryCount(message.headers);
if (retryCount >= MAX_RETRY_COUNT) {
// Escalate to a dedicated poison topic AND emit a metric so an
// operator actually notices. Silently committing past a poisoned
// record is how data loss hides.
poisonRecordsTotal.inc({ topic, event_type: event.type });
await this.sendToPoison(payload, msg, retryCount);
await this.commit(payload);
return;
}
await this.sendToDLQ(payload, msg, retryCount + 1);
await this.commit(payload);
return;
}
}
// 6. Remember we saw this id, evict oldest (true LRU) once we hit the cap.
this.recentIds.add(event.id);
if (this.recentIds.size > DEDUPE_LIMIT) {
const oldest = this.recentIds.values().next().value as string | undefined;
if (oldest) this.recentIds.delete(oldest);
}
// 7. Commit only after successful processing. This is the at-least-once
// guarantee: on crash between handler success and commit, we reprocess --
// which is why step 4 and idempotent handlers matter.
await this.commit(payload);
}
private async commit({ topic, partition, message }: EachMessagePayload): Promise<void> {
// Guard against a malformed offset string. BigInt() throws SyntaxError on
// non-numeric input, and a bare throw here means the DLQ write already
// went through but the commit didn't — infinite redelivery of a poison
// pill. Log, signal the operator, and let the run loop exit so a process
// supervisor can restart (or the operator intervenes).
let nextOffset: string;
try {
nextOffset = (BigInt(message.offset) + 1n).toString();
} catch (err) {
console.error(
`FATAL: malformed offset ${message.offset} on ${topic}:${partition}`,
err
);
// Disconnect so the consumer group rebalances away from this instance
// rather than spinning on a record it can never commit.
await this.consumer.disconnect();
throw err;
}
try {
// Commit offset+1 -- Kafka commits the NEXT offset to read.
await this.consumer.commitOffsets([{ topic, partition, offset: nextOffset }]);
} catch (err) {
// Commit failures are usually transient (rebalance in progress, broker
// unreachable). KafkaJS will retry on the next message, but we log so
// the ops dashboard sees it.
console.error(`commitOffsets failed for ${topic}:${partition}`, err);
throw err;
}
}
private async sendToPoison(
payload: EachMessagePayload,
error: string,
retryCount: number
): Promise<void> {
// Same shape as DLQ, different topic. Operators triage this topic
// manually — it should be low-volume and high-signal.
await this.dlqProducer.send({
topic: `${this.dlqTopic}-poison`,
messages: [
{
key: payload.message.key ?? undefined,
value: payload.message.value!,
headers: {
"source-topic": payload.topic,
"source-partition": String(payload.partition),
"source-offset": payload.message.offset,
"retry-count": String(retryCount),
"error-message": error.slice(0, 1024),
"poison-timestamp": new Date().toISOString(),
},
},
],
});
}
private async sendToDLQ(
{ topic, partition, message }: EachMessagePayload,
error: string,
retryCount: number
): Promise<void> {
if (!message.value) return;
// NOTE: this is an idempotent, non-transactional producer. Between this
// send() succeeding and the source-topic commit() that follows, a crash
// will replay the source record on restart and produce a second DLQ
// entry. DLQ consumers MUST dedupe on (source-topic, source-partition,
// source-offset) — those three headers together uniquely identify the
// originating record. For a tighter guarantee, wrap the DLQ send and
// the offset commit in a transactional producer with sendOffsets() (same
// pattern as the EOS section below).
await this.dlqProducer.send({
topic: this.dlqTopic,
messages: [
{
key: message.key ?? undefined,
value: message.value, // preserve original bytes for replay
headers: {
"source-topic": topic,
"source-partition": String(partition),
"source-offset": message.offset,
// message.key is Buffer | null; toString("utf8") assumes UTF-8
// string keys. If your producers use binary keys (protobuf
// composite, hash digest), base64-encode instead:
// message.key ? message.key.toString("base64") : ""
"source-event-id": message.key?.toString("utf8") ?? "",
"retry-count": String(retryCount),
// Truncate: a 10 KB stack trace in every DLQ header trips the
// broker's 1 MB message cap during an error storm. Full trace
// belongs in structured logs, keyed by source-offset.
"error-message": error.slice(0, 1024),
"dlq-timestamp": new Date().toISOString(),
},
},
],
});
}
async stop(): Promise<void> {
await this.consumer.disconnect();
await this.dlqProducer.disconnect();
}
}
function readRetryCount(headers: IHeaders | undefined): number {
const raw = headers?.["retry-count"];
if (!raw) return 0;
const str = Buffer.isBuffer(raw) ? raw.toString("utf8") : String(raw);
const n = Number.parseInt(str, 10);
return Number.isFinite(n) && n >= 0 ? n : 0;
}
And the validateEvent function the consumer calls — a discriminated-union validator built on the per-type Ajv compilations:
// src/validation.ts (continued)
import type { DomainEvent, PaymentCompleted } from "./events";
const paymentCompletedSchema: JSONSchemaType<PaymentCompleted> = {
type: "object",
properties: {
id: { type: "string" },
timestamp: { type: "string", format: "date-time" },
type: { type: "string", const: "PAYMENT_COMPLETED" },
data: {
type: "object",
properties: {
paymentId: { type: "string" },
orderId: { type: "string" },
amount: { type: "number", minimum: 0 },
},
required: ["paymentId", "orderId", "amount"],
additionalProperties: false,
},
},
required: ["id", "timestamp", "type", "data"],
additionalProperties: false,
};
const validatePaymentCompleted = ajv.compile(paymentCompletedSchema);
// We attach `errors` on the guard itself so callers have a single place to
// read validation failures, regardless of which concrete validator ran.
export const validateEvent = Object.assign(
(input: unknown): input is DomainEvent => {
validateEvent.errors = null;
if (typeof input !== "object" || input === null) {
validateEvent.errors = [{ message: "not an object" } as never];
return false;
}
const t = (input as { type?: unknown }).type;
if (t === "ORDER_CREATED") {
const ok = validateOrderCreated(input);
if (!ok) validateEvent.errors = validateOrderCreated.errors ?? null;
return ok;
}
if (t === "PAYMENT_COMPLETED") {
const ok = validatePaymentCompleted(input);
if (!ok) validateEvent.errors = validatePaymentCompleted.errors ?? null;
return ok;
}
validateEvent.errors = [
{ message: `unknown event type: ${String(t)}` } as never,
];
return false;
},
{ errors: null as typeof validateOrderCreated.errors }
);
The dead letter queue pattern is non-negotiable. Without it, a poison message causes infinite retries and blocks the partition. With it, bad messages are sidelined, the original bytes and source coordinates are preserved, and you can replay — with the retry-count header as the loop breaker.
One nuance on DLQ design that’s worth naming explicitly: attackers and bugs can poison the DLQ itself. If you feed DLQ contents back into the source topic on a retry job without re-checking retry-count, you’ve built a resubmission loop that will eventually overwhelm the broker. The retry-count header in the handler above is the circuit breaker: a record that has bounced MAX_RETRY_COUNT times stops being re-DLQ’d and gets committed away, forcing human triage rather than infinite bouncing.
Stream joins: combining order and payment events
Real pipelines don’t just consume single event streams. They join them. Here’s an analytics processor that correlates orders with payments:
// src/order-analytics.ts
import type { DomainEvent, OrderCreated, PaymentCompleted } from "./events";
interface OrderState {
orderId: string;
userId: string;
totalAmount: number;
productCount: number;
paymentStatus: "pending" | "completed";
createdAt: string;
}
export class OrderAnalyticsProcessor {
private orders = new Map<string, OrderState>();
handleEvent(event: DomainEvent): void {
switch (event.type) {
case "ORDER_CREATED":
this.handleOrderCreated(event);
break;
case "PAYMENT_COMPLETED":
this.handlePaymentCompleted(event);
break;
}
}
private handleOrderCreated(event: OrderCreated): void {
this.orders.set(event.data.orderId, {
orderId: event.data.orderId,
userId: event.data.userId,
totalAmount: event.data.totalAmount,
productCount: event.data.products.reduce((sum, p) => sum + p.quantity, 0),
paymentStatus: "pending",
createdAt: event.timestamp,
});
}
private handlePaymentCompleted(event: PaymentCompleted): void {
const order = this.orders.get(event.data.orderId);
if (!order) {
// payment arrived before the order -- silently dropping is data loss.
// In production: buffer to a pending-payments store keyed by orderId,
// then drain on the next matching ORDER_CREATED. Or requeue the payment
// with a short delay and a retry cap. See the out-of-order discussion below.
return;
}
order.paymentStatus = "completed";
}
getTotalRevenue(): number {
return Array.from(this.orders.values())
.filter((o) => o.paymentStatus === "completed")
.reduce((sum, o) => sum + o.totalAmount, 0);
}
}
In production, I wouldn’t hold state in a Map — that in-memory state evaporates on pod restart, which is fine for a demo and catastrophic for a revenue dashboard. Worse, the unbounded Map is a straight memory leak: there’s no eviction, so every order ever seen stays resident until the pod OOMs. Use Redis or Postgres as the state store, with a TTL that matches your join window. The TTL is the important part: a stream join needs a bounded window (“match any payment within 24 hours of the order”) because holding every order ever created in hot storage is both expensive and wrong. If a payment arrives outside the window, it’s a late event and belongs in a reconciliation job, not the real-time join.
Note also what handleOrderCreated does on a duplicate ORDER_CREATED (same orderId, which will happen under at-least-once delivery during rebalances or producer retries): it blindly overwrites the existing entry, resetting paymentStatus back to "pending" even if the payment already completed. That’s a silent revenue-loss bug. The consumer-level dedupe cache in the EventConsumer catches most of these, but the processor should also be defensive: check this.orders.has(orderId) before overwriting, or treat ORDER_CREATED as upsert-if-absent.
The naive if (!order) return in handlePaymentCompleted above is data loss. When a payment event arrives before its order — which happens more often than you’d think, because separate producers write to separate topics with independent partition leaders — dropping it silently means your revenue number is permanently wrong. The fix is a buffer-and-retry pattern: store the unmatched payment under its orderId in a short-TTL Redis key, and when handleOrderCreated fires, check that key and complete the join. Cap the buffer window (15 minutes is usually enough) and move anything that expires to a DLQ for investigation. Kafka Streams calls this a “stream-stream join with grace period”; you’re building the same thing by hand.
”Exactly-once” processing: what it actually means
This is the section where most articles get it wrong, so read it carefully. KafkaJS supports transactional producers, but wrapping a producer send in a transaction does not give you exactly-once consumption on its own. What it gives you is atomic multi-topic writes: either all messages in the transaction land, or none do.
True consume-transform-produce exactly-once semantics (EOS) requires three things, in one atomic unit:
- A transactional, idempotent producer with a stable, unique
transactionalIdper consumer instance (not a hardcoded string shared across pods — two pods with the sametransactionalIdwill fence each other and crash). transaction.sendOffsets({ consumerGroupId, topics })inside the transaction, so the consumer’s offsets are committed as part of the same transaction as the downstream writes.- Downstream consumers reading the output topic with
isolation.level=read_committedso they skip aborted transactions. KafkaJS consumers default to this (the option isreadUncommitted: false); other-language consumers must set it explicitly or they’ll see aborted transactional writes.
If you skip step 2, you have a transactional producer behind an at-least-once consumer — which is fine and usually what you want, but it is not exactly-once. Calling it exactly-once when it isn’t is the kind of mistake that ships duplicate payment charges.
Here’s the pattern done correctly:
// src/enrichment.ts
import { EachMessagePayload } from "kafkajs";
import sjson from "secure-json-parse";
import { kafka } from "./kafka-client";
const consumerGroupId = "enrichment-service";
// Transactional ID MUST be unique per producer instance. Derive from a stable
// per-pod identity -- hostname, StatefulSet ordinal, or the partition set
// assigned to this consumer. A hardcoded literal (or a silent fallback like
// `?? "local"`) means two pods fence each other and one crashes on startup.
// Fail fast if HOSTNAME isn't set — a deterministic ID is a correctness
// requirement, not a nice-to-have.
function requireEnv(name: string): string {
const v = process.env[name];
if (!v) throw new Error(`${name} is required for transactional producer ID`);
return v;
}
const transactionalId = `enrichment-${requireEnv("HOSTNAME")}`;
const consumer = kafka.consumer({ groupId: consumerGroupId });
const producer = kafka.producer({
transactionalId,
// Idempotent producers tolerate maxInFlightRequests up to 5 while
// preserving ordering (via sequence numbers). Setting it to 1 serializes
// all sends and tanks throughput for no safety benefit.
maxInFlightRequests: 5,
idempotent: true,
});
async function start(inputTopic: string, outputTopic: string): Promise<void> {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: inputTopic, fromBeginning: false });
await consumer.run({
autoCommit: false,
eachMessage: async (payload) => processAndForward(payload, outputTopic),
});
}
async function processAndForward(
{ topic, partition, message }: EachMessagePayload,
outputTopic: string
): Promise<void> {
// Guard malformed offsets BEFORE entering the transaction — BigInt throws
// on non-numeric strings, and failing inside the tx just aborts it without
// moving past the poison record.
let nextOffset: string;
try {
nextOffset = (BigInt(message.offset) + 1n).toString();
} catch (err) {
console.error(`malformed offset ${message.offset} on ${topic}:${partition}`);
throw err;
}
const transaction = await producer.transaction();
try {
// Even a null/empty value needs its offset advanced, or we stall the
// partition on the first tombstone. Commit a no-op offsets-only
// transaction for the empty case rather than bare-returning.
if (message.value) {
const enriched = JSON.stringify({
...sjson.parse(message.value.toString("utf8"), {
protoAction: "remove",
constructorAction: "remove",
}),
enrichedAt: new Date().toISOString(),
});
await transaction.send({
topic: outputTopic,
messages: [{ key: message.key ?? undefined, value: enriched }],
});
}
// This is the line that makes it exactly-once: the consumer's offset
// commit is part of the SAME transaction as the downstream write.
await transaction.sendOffsets({
consumerGroupId,
topics: [
{ topic, partitions: [{ partition, offset: nextOffset }] },
],
});
await transaction.commit();
} catch (err) {
await transaction.abort();
throw err;
}
}
A few things worth naming about the “exactly-once” label itself. EOS is exactly-once within the Kafka boundary — consume from Kafka, write back to Kafka. The moment your handler writes to an external system (Postgres, Redis, an HTTP API), you’re back to at-least-once at that boundary unless that system supports its own idempotency (unique constraint, upsert-by-key, idempotency key). Don’t promise exactly-once to downstream consumers of your database just because your Kafka pipeline is transactional; the two are separate guarantees.
Monitoring: what to watch
I expose Prometheus metrics from every consumer:
import { Counter, Histogram, register } from "prom-client";
import express from "express";
const eventsProcessed = new Counter({
name: "kafka_events_processed_total",
help: "Total events processed",
labelNames: ["topic", "event_type", "status"] as const,
});
const processingDuration = new Histogram({
name: "kafka_event_processing_seconds",
help: "Event processing duration",
labelNames: ["topic", "event_type"] as const,
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5],
});
// Expose /metrics endpoint
const app = express();
app.get("/metrics", async (_req, res) => {
res.set("Content-Type", register.contentType);
res.end(await register.metrics());
});
app.listen(9090);
The metrics I alert on:
- Consumer lag (via Kafka’s built-in metrics or Burrow) — if lag is growing, your consumer can’t keep up.
- DLQ message rate — a spike means something is broken in your handler or schema.
- Processing duration p99 — if this is climbing, you’re approaching capacity.
PII, schemas, and the things I left out
Two topics this post deliberately didn’t deep-dive into, both of which you need before production.
PII in event payloads and headers. Order events carry userId and amounts; payment events carry payment IDs. Kafka topics are log-structured and long-retained — by default, a message written today is readable tomorrow, next week, and sometimes years later. Don’t put raw email addresses, names, or card data in event payloads unless the topic has an explicit retention policy, the broker supports at-rest encryption (MSK, Confluent Cloud do; self-managed requires disk encryption + TLS), and your compliance team has signed off on the retention. For GDPR-exposed fields, the defensible pattern is tokenization: write a stable opaque ID in the event, keep the PII in a separate store you can delete from. Headers are the same deal — don’t stuff PII into headers thinking they’re metadata.
Schema registry for multi-team pipelines. The Ajv-in-your-repo approach above works fine when one team owns every producer and every consumer. It falls apart the moment another team publishes to your topic, because there’s no enforceable contract. Confluent Schema Registry (or Apicurio, or Redpanda’s version) gives you versioned schemas per subject, compatibility checks (backward, forward, full) on every new schema, and a shared source of truth that producers and consumers both validate against. If you’re running a shared platform, adopt a registry from day one; retrofitting it later is painful because producers have already shipped drift.
What I’d choose today
For a new event-driven system with TypeScript:
- KafkaJS as the client. It’s the right library for TypeScript. Confluent’s
node-rdkafkais faster but a native C dependency that’s painful to build in CI. - TLS + SASL (SCRAM-SHA-512) from the start. Never ship a
brokers: ["host:9092"]config withoutsslandsasl. - Ajv schema validation at the consumer boundary, called on every message — not defined and forgotten.
- Manual offset commits after successful processing. Auto-commit is data loss with a convenient API.
- Dead letter queues with correlation headers and a
retry-countcap. No exceptions. - Transactional producers with
sendOffsetsfor any consume-transform-produce loop that actually needs exactly-once semantics — and a uniquetransactionalIdper pod. - Idempotent handlers with a dedupe cache keyed on
event.id. At-least-once is the default, so handlers must tolerate duplicates. - Partition by entity ID (orderId, userId) for ordering guarantees.
- Prometheus metrics from day one.
- Schema registry the moment a second team starts producing to your topics.
Skip Kafka Streams (it’s Java-only) and skip writing your own consumer group management. KafkaJS handles rebalancing correctly. Focus your time on the business logic in your event handlers, not on reinventing infrastructure.