ion-grade patterns.
Step 1: Define Immutable Event Contracts
Events must be append-only, versioned, and schema-validated. Never mutate published events. Use JSON Schema or Protocol Buffers for cross-language compatibility.
// events.ts
import { z } from 'zod';
export const UserCreatedEventSchema = z.object({
eventId: z.string().uuid(),
eventType: z.literal('user.created'),
timestamp: z.string().datetime(),
version: z.literal('1.0.0'),
payload: z.object({
userId: z.string().uuid(),
email: z.string().email(),
createdAt: z.string().datetime(),
}),
});
export type UserCreatedEvent = z.infer<typeof UserCreatedEventSchema>;
Step 2: Implement an Idempotent Producer
Producers must attach idempotency keys and enforce schema validation before publishing. Broker acknowledgments should be awaited, but consumers must still handle duplicates.
// producer.ts
import { Kafka, Producer, ProducerRecord } from 'kafkajs';
import { UserCreatedEvent, UserCreatedEventSchema } from './events';
export class EventProducer {
private producer: Producer;
constructor(brokerUrl: string) {
const kafka = new Kafka({ clientId: 'event-producer', brokers: [brokerUrl] });
this.producer = kafka.producer({ idempotent: true });
}
async init() {
await this.producer.connect();
}
async publish(event: UserCreatedEvent): Promise<void> {
const validated = UserCreatedEventSchema.parse(event);
const record: ProducerRecord = {
topic: 'user-events',
messages: [
{
key: validated.payload.userId,
value: JSON.stringify(validated),
headers: {
'idempotency-key': validated.eventId,
'event-type': validated.eventType,
'schema-version': validated.version,
},
},
],
};
await this.producer.send(record);
}
async disconnect() {
await this.producer.disconnect();
}
}
Step 3: Consumer Group with Offset Management
Consumers must process events exactly once per logical unit of work. Use consumer groups for parallelism, but enforce partition-level ordering when state depends on sequence.
// consumer.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { UserCreatedEventSchema } from './events';
export class EventConsumer {
private consumer: Consumer;
private processedIds: Set<string> = new Set();
constructor(brokerUrl: string, groupId: string) {
const kafka = new Kafka({ clientId: 'event-consumer', brokers: [brokerUrl] });
this.consumer = kafka.consumer({ groupId, isolationLevel: 'read_committed' });
}
async init() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'user-events', fromBeginning: false });
}
async run(handler: (event: UserCreatedEvent) => Promise<void>) {
await this.consumer.run({
eachMessage: async ({ message, partition }: EachMessagePayload) => {
if (!message.value) return;
const raw = message.value.toString();
const parsed = JSON.parse(raw);
const validated = UserCreatedEventSchema.parse(parsed);
// Idempotency guard
if (this.processedIds.has(validated.eventId)) {
return;
}
try {
await handler(validated);
this.processedIds.add(validated.eventId);
// Manual commit for precise offset control
await this.consumer.commitOffsets([
{ topic: 'user-events', partition, offset: (Number(message.offset) + 1).toString() },
]);
} catch (error) {
// Route to DLQ in production; log and skip to prevent consumer stall
console.error(`Processing failed for ${validated.eventId}:`, error);
await this.consumer.commitOffsets([
{ topic: 'user-events', partition, offset: (Number(message.offset) + 1).toString() },
]);
}
},
});
}
async disconnect() {
await this.consumer.disconnect();
}
}
Step 4: Architecture Decisions & Rationale
- Idempotent Producer: Kafka’s idempotent producer prevents duplicate messages during broker retries. This is mandatory; without it, network partitions cause exactly-once semantics to break.
- Manual Offset Commit: Auto-commit risks processing events twice on rebalance. Manual commit after successful handler execution guarantees at-least-once delivery with application-level deduplication.
- Partition Key Strategy: Using
userId as the partition key ensures all events for a single user land in the same partition, preserving causal ordering. Cross-user events can be processed in parallel.
- Schema Validation at Ingress: Parsing and validating before business logic prevents malformed events from corrupting state. Schema evolution must be backward-compatible; breaking changes require versioned topics.
- Dead Letter Queue (DLQ) Routing: The example logs failures and commits offsets to avoid consumer stall. Production systems must route failed messages to a DLQ topic for replay after patch deployment.
Pitfall Guide
-
Treating Events as Commands
Events describe what happened. Commands describe what should happen. Routing commands through event brokers creates tight coupling and breaks idempotency. Fix: Separate command channels (HTTP/gRPC) from event channels. Publish events only after state mutation succeeds.
-
Ignoring Idempotency
At-least-once delivery is the default in distributed brokers. Without idempotency keys or database-level unique constraints, duplicate processing corrupts aggregates. Fix: Store eventId in a deduplication table or use database unique indexes on event IDs.
-
Synchronous Event Handling
Blocking the consumer thread for external API calls, file I/O, or long computations defeats async decoupling. Fix: Offload heavy work to worker pools, use background job queues, or split consumers into fast acknowledgment and slow processing pipelines.
-
Missing Schema Registry
Unversioned payloads cause silent deserialization failures. Fix: Enforce schema validation at producer and consumer boundaries. Use a schema registry (Confluent, Apicurio) to block incompatible changes during CI/CD.
-
Poor Retry & DLQ Strategy
Exponential backoff with infinite retries stalls consumers. Fix: Implement bounded retries (e.g., 3 attempts), then route to DLQ. Monitor DLQ depth as a critical alert. Never commit offsets for unhandled messages.
-
Over-Architecting Simple Workflows
EDA adds operational complexity. If Service A calls Service B once per request and requires immediate feedback, REST/gRPC is superior. Fix: Reserve EDA for cross-service state propagation, audit trails, and fan-out notifications.
-
Neglecting Consumer Lag Monitoring
Unmonitored lag causes stale data and delayed reactions. Fix: Track consumer_lag per partition. Alert when lag exceeds SLA thresholds. Scale consumer instances horizontally, not by increasing partition count arbitrarily.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High throughput fan-out (notifications, analytics) | Event-Driven Pub/Sub | Decouples producers, scales consumers independently | Low infrastructure cost, high engineering discipline |
| Strict cross-service transaction | Synchronous Saga or 2PC | Requires immediate consistency and rollback guarantees | Higher latency, complex compensation logic |
| Low-latency user-facing API | Synchronous REST/gRPC | Avoids async acknowledgment overhead | Minimal operational overhead |
| Audit trail & state reconstruction | Event Sourcing + CQRS | Immutable event log enables point-in-time recovery | High storage cost, complex query layer |
Configuration Template
// broker.config.ts
export const BROKER_CONFIG = {
clientId: 'eda-service',
brokers: [process.env.KAFKA_BROKERS || 'localhost:9092'],
producer: {
idempotent: true,
transactionalId: 'eda-producer-txn-1',
maxInFlightRequests: 5,
retry: {
retries: 5,
minTimeout: 100,
factor: 2,
},
},
consumer: {
groupId: 'eda-consumer-group',
isolationLevel: 'read_committed',
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576,
retry: {
retries: 3,
minTimeout: 200,
factor: 2,
},
},
dlq: {
topic: 'eda-dlq',
maxRetries: 3,
alertThreshold: 50,
},
schemaValidation: {
enabled: true,
registryUrl: process.env.SCHEMA_REGISTRY_URL,
failOnMissingSchema: true,
},
};
Quick Start Guide
- Initialize Broker: Run a local Kafka instance via Docker Compose or use a managed service (Confluent Cloud, AWS MSK). Export
KAFKA_BROKERS environment variable.
- Install Dependencies:
npm install kafkajs zod uuid
- Start Producer: Import
EventProducer, call init(), and publish a validated UserCreatedEvent with a UUID eventId.
- Start Consumer: Import
EventConsumer, call init(), and pass an async handler that writes to a database with a unique constraint on eventId.
- Verify: Check consumer lag via
kafka-consumer-groups.sh --describe. Confirm idempotency by republishing the same event twice; the second should be silently deduplicated.