ATS JetStream) for event delivery. Enforce schema validation via Avro/Protobuf with a registry to prevent breaking changes.
-
Implement Idempotent Consumers
Every consumer must deduplicate messages using a composite key (event ID + partition offset) and maintain idempotent state transitions.
-
Add Resilience Controls
Implement exponential backoff retries, dead letter queues (DLQ) for poison messages, and circuit breakers on synchronous fallback paths.
-
Instrument End-to-End Observability
Propagate trace IDs across publish/consume boundaries. Correlate events with business transactions using correlation IDs.
TypeScript Implementation
// event-contract.ts
import { z } from 'zod';
export const OrderCreatedEvent = z.object({
eventId: z.string().uuid(),
aggregateId: z.string(),
timestamp: z.string().datetime(),
payload: z.object({
customerId: z.string(),
items: z.array(z.object({ sku: z.string(), quantity: z.number() })),
totalCents: z.number().int().nonnegative()
})
});
export type OrderCreatedEvent = z.infer<typeof OrderCreatedEvent>;
// event-publisher.ts
import { Kafka, Producer } from 'kafkajs';
import { OrderCreatedEvent } from './event-contract';
export class EventPublisher {
private producer: Producer;
constructor(brokers: string[]) {
const kafka = new Kafka({ clientId: 'order-service', brokers });
this.producer = kafka.producer({ allowAutoTopicCreation: false });
}
async connect() {
await this.producer.connect();
}
async publish(event: OrderCreatedEvent, topic: string) {
const validated = OrderCreatedEvent.parse(event);
await this.producer.send({
topic,
messages: [
{
key: validated.aggregateId,
value: JSON.stringify(validated.payload),
headers: {
'event-id': validated.eventId,
'event-type': 'order.created',
'correlation-id': validated.eventId
}
}
],
acks: -1, // wait for all replicas
compression: 'gzip'
});
}
async disconnect() {
await this.producer.disconnect();
}
}
// event-consumer.ts
import { Consumer, EachMessagePayload } from 'kafkajs';
import { EventEmitter } from 'events';
export class IdempotentConsumer extends EventEmitter {
private processedIds = new Map<string, number>();
private readonly TTL_MS = 30 * 60 * 1000; // 30 min dedup window
constructor(private consumer: Consumer) {
super();
}
async start(topic: string, groupId: string) {
await this.consumer.connect();
await this.consumer.subscribe({ topic, fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ message, partition, offset }) => {
if (!message.headers?.['event-id']) return;
const eventId = message.headers['event-id'].toString();
const dedupKey = `${partition}:${eventId}`;
if (this.processedIds.has(dedupKey)) {
this.consumer.commitOffsets([{ topic, partition, offset }]);
return;
}
try {
const payload = JSON.parse(message.value?.toString() || '{}');
this.emit('order.created', payload);
this.processedIds.set(dedupKey, Date.now());
await this.consumer.commitOffsets([{ topic, partition, offset }]);
} catch (err) {
// Route to DLQ or trigger alert; do not commit offset
console.error(`Poison message at ${partition}:${offset}`, err);
throw err;
}
}
});
}
// Periodic cleanup to prevent memory leaks
startCleanup() {
setInterval(() => {
const now = Date.now();
for (const [key, ts] of this.processedIds.entries()) {
if (now - ts > this.TTL_MS) this.processedIds.delete(key);
}
}, 5 * 60 * 1000);
}
}
Architecture Decisions & Rationale
- Partition Key Strategy: Using
aggregateId as the Kafka message key guarantees ordering per business entity while allowing parallel processing across aggregates.
- Acks=-1: Ensures durability by waiting for ISR acknowledgment, trading slight publish latency for zero data loss on leader failure.
- In-Memory Deduplication: Suitable for single-instance consumers. For scaled deployments, replace with Redis-backed idempotency stores using
SETNX with TTL.
- Header-Based Metadata: Keeps payload schema stable while carrying routing, tracing, and contract versioning data. Schema registries validate the value payload independently.
- Separation of Reads/Writes: CQRS naturally emerges. Synchronous gRPC handles real-time queries against materialized views; async events drive state mutations. This eliminates read/write contention and allows independent scaling.
Pitfall Guide
-
Treating Message Brokers as Synchronous RPC Channels
Request-reply patterns over queues introduce hidden coupling and timeout management overhead. If you need immediate responses, use gRPC. If you need eventual consistency, use events. Mixing semantics creates unpredictable failure modes.
-
Missing Idempotency Guarantees
At-least-once delivery is the default in 95% of production brokers. Without deduplication, consumers process duplicate events, causing double charges, inventory corruption, or state machine violations. Always implement idempotency keys or use exactly-once semantics where supported.
-
Ignoring Backpressure & Consumer Lag
Producers can outpace consumers during traffic spikes. Unmonitored lag causes offset expiration, message loss, or consumer group rebalancing storms. Implement consumer-side rate limiting, dynamic partition scaling, and lag-based alerting.
-
Schema Evolution Without Compatibility Rules
Adding required fields or renaming types breaks downstream consumers. Enforce backward/forward compatibility via schema registry policies. Use nullable fields, default values, and explicit versioning in event types.
-
No Dead Letter Queue Strategy
Poison messages block consumer progress and trigger infinite retry loops. Route malformed or unprocessable events to a DLQ after N retries. Implement automated parsing, alerting, and replay pipelines for DLQ contents.
-
Over-Indexing on Low Latency
Optimizing for sub-50ms publish times often sacrifices durability (acks=0/1) and partitioning efficiency. Latency-sensitive paths should remain synchronous. Async paths should optimize for throughput and consistency guarantees.
-
Missing Correlation & Distributed Tracing
Events propagate state changes across services. Without correlation IDs and trace context propagation, debugging becomes impossible. Inject trace headers at publish time and extract them in consumers. Wire into OpenTelemetry spans.
Production Best Practices
- Contract-test events using provider-consumer verification pipelines.
- Version event types explicitly (
order.created.v1).
- Monitor consumer lag, DLQ depth, and schema compatibility violations.
- Implement graceful shutdown hooks to flush in-flight messages and commit final offsets.
- Use consumer groups with sticky partition assignment to reduce rebalancing overhead.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Real-time inventory check during checkout | Synchronous gRPC | Requires immediate consistency and low latency | Low infra, high coupling risk |
| Order fulfillment notifications | Asynchronous event streaming | Decouples services, tolerates eventual consistency | Medium infra, high resilience |
| Cross-service reporting dashboard | Hybrid (Sync reads + Async materialization) | Reads need speed; writes need durability | Medium infra, balanced cost |
| Legacy system integration | Synchronous with async adapter | Bridge incompatible contracts without blocking core flow | High adapter maintenance |
| High-frequency telemetry ingestion | Asynchronous with batch consumers | Throughput optimization, tolerance for delayed processing | Low per-event cost, high batch efficiency |
Configuration Template
# docker-compose.infra.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports: ["2181:2181"]
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on: [zookeeper]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
ports: ["9092:9092"]
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on: [kafka]
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
ports: ["8081:8081"]
// infrastructure/kafka-config.ts
export const KAFKA_CONFIG = {
clientId: 'microservice-bridge',
brokers: [process.env.KAFKA_BROKERS || 'localhost:9092'],
ssl: process.env.NODE_ENV === 'production',
sasl: process.env.NODE_ENV === 'production'
? { mechanism: 'scram-sha-256', username: process.env.KAFKA_USER, password: process.env.KAFKA_PASS }
: undefined
};
export const PRODUCER_CONFIG = {
transactionalId: 'order-service-prod-1',
idempotent: true,
maxInFlightRequests: 5,
retry: { retries: 5, initialRetryTime: 100, factor: 2 }
};
export const CONSUMER_CONFIG = {
groupId: 'inventory-service-group',
sessionTimeout: 30000,
rebalanceTimeout: 60000,
heartbeatInterval: 3000,
maxBytesPerPartition: 1048576,
autoCommit: false,
autoCommitInterval: 5000
};
Quick Start Guide
- Spin up infrastructure: Run
docker-compose -f docker-compose.infra.yml up -d to start Kafka, ZooKeeper, and Schema Registry locally.
- Initialize TypeScript project: Run
npm init -y && npm i kafkajs zod @opentelemetry/api and create src/ with publisher/consumer modules from the Core Solution.
- Configure environment: Export
KAFKA_BROKERS=localhost:9092 and NODE_ENV=development. Create topics via CLI: kafka-topics --create --topic order.created --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092.
- Start producer & consumer: Instantiate
EventPublisher and IdempotentConsumer, connect both, and publish a test event. Verify consumer logs show deduplication and offset commits.
- Validate resilience: Kill the consumer process mid-flight, restart it, and confirm offset recovery. Inject a malformed payload and verify DLQ routing or error logging. Adjust partition counts and consumer group size to match expected load.