ql.conf` and creating a replication user.
-- postgresql.conf settings
wal_level = logical;
max_replication_slots = 4;
max_wal_senders = 4;
Create a dedicated user with minimal privileges:
CREATE USER cdc_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT CONNECT ON DATABASE mydb TO cdc_user;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
Step 2: Deploy Debezium Connector
Configure the connector via the Kafka Connect REST API. The configuration defines the source, topic naming, and schema handling.
{
"name": "postgres-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-host",
"database.port": "5432",
"database.user": "cdc_user",
"database.password": "secure_password",
"database.dbname": "mydb",
"database.server.name": "server1",
"table.include.list": "public.users,public.orders",
"plugin.name": "pgoutput",
"snapshot.mode": "initial",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.add.fields": "op,ts_ms,source.table,source.ts_ms"
}
}
Rationale: The ExtractNewRecordState SMT flattens the Debezium envelope, reducing payload size. Tombstones are preserved for delete propagation. Avro serialization enforces schema compatibility via the Schema Registry.
Step 3: TypeScript Consumer Implementation
Consumers must handle the CDC envelope structure, manage offsets, and ensure idempotency.
import { Kafka, logLevel } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
clientId: 'cdc-consumer',
brokers: ['kafka-broker:9092'],
logLevel: logLevel.WARN,
});
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });
const consumer = kafka.consumer({ groupId: 'cdc-group-1' });
interface CDCEnvelope<T> {
before: T | null;
after: T | null;
source: {
ts_ms: number;
table: string;
op: string; // c, u, d, r
};
op: string;
}
async function run() {
await consumer.connect();
await consumer.subscribe({ topic: 'server1.public.users', fromBeginning: false });
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (const message of batch.messages) {
if (!isRunning() || isStale()) break;
if (message.value) {
// Decode Avro payload
const envelope = await registry.decode(message.value) as CDCEnvelope<any>;
await processChange(envelope);
} else {
// Handle tombstone (delete)
await handleDelete(message.key);
}
resolveOffset(message.offset);
await heartbeat();
}
},
});
}
async function processChange<T>(envelope: CDCEnvelope<T>) {
const { op, after, before } = envelope;
switch (op) {
case 'c': // Create
await upsertRecord(after);
break;
case 'u': // Update
await upsertRecord(after);
break;
case 'd': // Delete
await softDeleteRecord(before?.id);
break;
case 'r': // Read (Snapshot)
await upsertRecord(after);
break;
}
}
async function upsertRecord<T>(record: T) {
// Implement idempotent upsert logic
// Example: INSERT ... ON CONFLICT DO UPDATE
console.log('Upserting:', record);
}
async function handleDelete(key: Buffer | null) {
// Key is typically the primary key
console.log('Deleting key:', key?.toString());
}
run().catch(console.error);
Key Implementation Details:
- Avro Decoding: The consumer decodes messages using the Schema Registry, ensuring compatibility with schema evolution.
- Tombstone Handling: Kafka compacts topics by retaining the last message for a key. A
null value indicates a tombstone, signaling a delete. The consumer must explicitly handle this.
- Offset Management: Manual offset resolution within the batch ensures exactly-once processing semantics relative to the consumer group.
- Idempotency: The
processChange function uses upsert logic. Since CDC may replay events during rebalancing or failures, the downstream store must be idempotent.
Pitfall Guide
1. Ignoring Partition Ordering
- Mistake: Producing events without keying by primary key.
- Consequence: Updates arrive out of order, causing data corruption.
- Fix: Configure Debezium to use the primary key as the Kafka message key. Ensure the consumer group processes partitions sequentially per key.
2. Schema Drift Breaking Consumers
- Mistake: Modifying source tables without updating consumer schemas or registry compatibility settings.
- Consequence: Deserialization failures halt the pipeline.
- Fix: Use Avro/Protobuf with a Schema Registry. Enforce
BACKWARD or FORWARD compatibility rules. Implement fallback deserialization strategies for unknown fields.
3. Missing Tombstone Propagation
- Mistake: Filtering out
null values in the consumer.
- Consequence: Deletes are never propagated to downstream systems, leading to data divergence.
- Fix: Explicitly check for
message.value === null and trigger delete logic based on the message key.
4. Snapshot vs. Streaming Transition Gaps
- Mistake: Assuming the snapshot captures a consistent point-in-time without overlap.
- Consequence: Events occurring during the snapshot phase may be duplicated or missed.
- Fix: Debezium handles this by capturing the LSN before snapshotting and resuming streaming after. Ensure consumers handle duplicates during the transition via idempotent writes.
5. Replication Slot Lag and Disk Exhaustion
- Mistake: Failing to monitor replication slot lag or consumer offsets.
- Consequence: The database retains WAL files indefinitely, filling disk space and crashing the instance.
- Fix: Monitor
pg_replication_slots. Alert on lag thresholds. Implement automatic slot management or pause connectors during prolonged outages.
6. PII Leakage in Logs
- Mistake: CDC capturing sensitive data without redaction.
- Consequence: Compliance violations (GDPR, HIPAA) as PII flows to all consumers.
- Fix: Implement Single Message Transforms (SMT) to hash or mask sensitive fields before writing to Kafka. Use column-level filtering in the connector config for unnecessary data.
7. Backpressure and Consumer Lag
- Mistake: Consumers processing events slower than the production rate.
- Consequence: Growing lag increases latency and memory pressure on the broker.
- Fix: Scale consumers horizontally. Monitor lag metrics. Implement dead-letter queues for poison pills to prevent blocking.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High-throughput OLTP, Real-time needs | Log-based CDC (Debezium + Kafka) | Minimal DB overhead, scalable, handles schema evolution | High infra cost, low dev maintenance |
| Low volume, Simple sync | Trigger-based CDC | No external infrastructure, immediate implementation | Low infra cost, high DB load, brittle |
| Legacy DB, No Log Access | Timestamp Polling | Only viable option for read-only or unsupported engines | High latency, high I/O cost, complex logic |
| Multi-cloud Replication | Log-based CDC with Cloud Storage Sink | Decoupled transport, durable storage, cross-region sync | Moderate infra cost, high reliability |
Configuration Template
Docker Compose for Local CDC Stack
version: '3.8'
services:
postgres:
image: debezium/postgres:15
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: inventory
ports:
- "5432:5432"
command: postgres -c wal_level=logical
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
kafka-connect:
image: debezium/connect:2.5
depends_on:
- kafka
- schema-registry
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: connect-cluster
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
Quick Start Guide
- Initialize Stack: Run
docker-compose up -d to start PostgreSQL, Kafka, Schema Registry, and Kafka Connect.
- Register Connector: POST the Debezium configuration JSON to
http://localhost:8083/connectors to start capturing changes from the inventory database.
- Generate Traffic: Insert, update, and delete records in the
public schema of PostgreSQL.
- Verify Topics: Use
kafka-topics --list --bootstrap-server localhost:9092 to confirm topics like server1.public.<table> are created.
- Consume Events: Run a TypeScript consumer or use
kafka-console-consumer to inspect the structured change events, verifying the presence of before, after, and op fields.
Change Data Capture transforms static databases into dynamic event sources. By leveraging log-based architectures, engineering teams achieve real-time data synchronization with minimal overhead, enabling responsive applications and robust data pipelines. Implementation requires careful attention to ordering, schema evolution, and operational monitoring, but the resulting architecture provides the scalability and reliability demanded by modern data systems.