BaseEvent {
type: 'OrderItemAdded';
payload: {
productId: string;
quantity: number;
price: number;
};
}
export type OrderEvent = OrderCreated | OrderItemAdded;
### Step 2: Implement the Aggregate Root
Aggregates enforce consistency boundaries. They load from events, apply new commands, and emit new events.
```typescript
// domain/order-aggregate.ts
import { OrderEvent, OrderCreated, OrderItemAdded } from '../events/order-events';
export class OrderAggregate {
public id: string;
public customerId: string = '';
public items: Array<{ productId: string; quantity: number; price: number }> = [];
public totalAmount: number = 0;
public version: number = 0;
constructor(id: string) {
this.id = id;
}
// Replay events to rebuild state
public loadFromHistory(events: OrderEvent[]): void {
for (const event of events) {
this.applyEvent(event);
}
}
// Apply single event to mutate aggregate
private applyEvent(event: OrderEvent): void {
switch (event.type) {
case 'OrderCreated':
this.customerId = event.payload.customerId;
this.items = [...event.payload.items];
this.totalAmount = event.payload.totalAmount;
break;
case 'OrderItemAdded':
this.items.push(event.payload);
this.totalAmount += event.payload.price * event.payload.quantity;
break;
}
this.version = event.version;
}
// Command handler: creates order
public static create(id: string, customerId: string, items: Array<{ productId: string; quantity: number; price: number }>): OrderEvent[] {
const total = items.reduce((sum, i) => sum + i.price * i.quantity, 0);
return [{
eventId: crypto.randomUUID(),
aggregateId: id,
aggregateType: 'Order',
timestamp: new Date(),
version: 1,
type: 'OrderCreated',
payload: { customerId, items, totalAmount: total }
}];
}
// Command handler: adds item
public addItem(productId: string, quantity: number, price: number): OrderEvent[] {
if (this.version === 0) throw new Error('Aggregate not initialized');
return [{
eventId: crypto.randomUUID(),
aggregateId: this.id,
aggregateType: 'Order',
timestamp: new Date(),
version: this.version + 1,
type: 'OrderItemAdded',
payload: { productId, quantity, price }
}];
}
}
Step 3: Event Store Abstraction
The event store must guarantee append-only semantics, strict ordering per aggregate, and optimistic concurrency control.
// infrastructure/event-store.ts
import { OrderEvent } from '../events/order-events';
export interface IEventStore {
append(aggregateId: string, expectedVersion: number, events: OrderEvent[]): Promise<void>;
load(aggregateId: string): Promise<OrderEvent[]>;
}
// Example using PostgreSQL with JSONB (production would use dedicated ES like EventStoreDB or Kafka)
export class PostgresEventStore implements IEventStore {
constructor(private pool: any) {}
async load(aggregateId: string): Promise<OrderEvent[]> {
const res = await this.pool.query(
'SELECT payload, version FROM events WHERE aggregate_id = $1 ORDER BY version ASC',
[aggregateId]
);
return res.rows.map((r: any) => ({ ...JSON.parse(r.payload), version: r.version }));
}
async append(aggregateId: string, expectedVersion: number, events: OrderEvent[]): Promise<void> {
// Optimistic concurrency: version must match exactly
const client = await this.pool.connect();
try {
await client.query('BEGIN');
for (const event of events) {
if (event.version !== expectedVersion + 1) {
throw new Error(`Concurrency conflict: expected version ${expectedVersion + 1}, got ${event.version}`);
}
await client.query(
'INSERT INTO events (aggregate_id, version, payload, created_at) VALUES ($1, $2, $3, NOW())',
[aggregateId, event.version, JSON.stringify(event)]
);
expectedVersion = event.version;
}
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
}
}
Step 4: Query Side Projection
Projections consume events asynchronously and maintain denormalized read models optimized for specific queries.
// projections/order-read-model.ts
import { OrderEvent } from '../events/order-events';
export class OrderReadModelProjection {
constructor(private db: any) {}
async handle(event: OrderEvent): Promise<void> {
switch (event.type) {
case 'OrderCreated':
await this.db.query(
`INSERT INTO order_read (order_id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, 'CREATED', NOW())`,
[event.aggregateId, event.payload.customerId, event.payload.totalAmount]
);
break;
case 'OrderItemAdded':
await this.db.query(
`UPDATE order_read
SET total_amount = total_amount + $1, item_count = COALESCE(item_count, 0) + 1
WHERE order_id = $2`,
[event.payload.price * event.payload.quantity, event.aggregateId]
);
break;
}
}
}
Step 5: Command Handler & Message Routing
Commands enter through the command side, mutate aggregates, persist events, and publish them to a message broker for projection consumption.
// application/order-command-handler.ts
import { OrderAggregate } from '../domain/order-aggregate';
import { IEventStore } from '../infrastructure/event-store';
import { EventBus } from '../infrastructure/event-bus';
export class OrderCommandHandler {
constructor(
private eventStore: IEventStore,
private eventBus: EventBus
) {}
async handleCreateOrder(command: { orderId: string; customerId: string; items: any[] }): Promise<void> {
const aggregate = OrderAggregate.create(command.orderId, command.customerId, command.items);
await this.eventStore.append(command.orderId, 0, aggregate);
await this.eventBus.publish(aggregate);
}
async handleAddItem(command: { orderId: string; productId: string; quantity: number; price: number }): Promise<void> {
const history = await this.eventStore.load(command.orderId);
const aggregate = new OrderAggregate(command.orderId);
aggregate.loadFromHistory(history);
const newEvents = aggregate.addItem(command.productId, command.quantity, command.price);
await this.eventStore.append(command.orderId, aggregate.version, newEvents);
await this.eventBus.publish(newEvents);
}
}
Architecture Decisions & Rationale
- Append-only event store: Guarantees immutability. Enables temporal queries, debugging, and replay without snapshot corruption.
- Optimistic concurrency via versioning: Prevents lost updates in distributed environments. Fails fast on version mismatch rather than overwriting state.
- Asynchronous projections: Decouples write latency from read model complexity. Allows independent scaling and technology selection (e.g., Elasticsearch for search, Redis for caching, PostgreSQL for relational queries).
- Aggregate as consistency boundary: Commands validate against loaded state. Events are emitted only after successful validation. This eliminates partial updates and enforces domain invariants.
- Idempotency at command/event level: Critical for exactly-once processing in distributed systems. Handled via unique command IDs or event deduplication tables.
Pitfall Guide
-
Treating events as database rows
Events represent business facts, not column updates. Storing UPDATE orders SET status = 'SHIPPED' as an event violates the pattern. Events should be domain-language statements: OrderShipped, PaymentReceived. Database deltas create tight coupling to schema changes and break replay capability.
-
Ignoring event schema evolution
Events are immutable, but business requirements change. Teams that freeze event schemas cannot adapt. Teams that mutate them break replay. Solution: implement explicit versioning (event.version) and backward-compatible deserialization. Use upcasters to transform legacy events during projection replay.
-
Synchronous projections blocking commands
Running projections in the same transaction as command execution kills write throughput and couples read/write models. Projections must be asynchronous. Use a message broker or stream processor (Kafka, RabbitMQ, AWS EventBridge) with retry and dead-letter queue handling.
-
Missing idempotency controls
Distributed systems retry. Without idempotency, duplicate commands create duplicate events or corrupt read models. Implement command deduplication using a command_id table with unique constraints. Event handlers should check processed event IDs before applying projections.
-
Over-partitioning aggregates
Creating an aggregate per UI form or database table leads to chatty systems with high latency and complex consistency rules. Aggregates must represent true consistency boundaries. If two entities must never be in an invalid state together, they belong in the same aggregate. Otherwise, use eventual consistency and sagas.
-
Assuming strong consistency across models
CQRS inherently uses eventual consistency between command and query sides. Teams that demand strong consistency defeat the pattern's purpose. Design UI/UX to handle pending states, use optimistic updates, and implement conflict resolution strategies at the domain level.
-
Storing derived state instead of events
Caching totalAmount in the event payload without storing the raw line items breaks auditability. Events should contain the minimal data required to reconstruct state. Derived values belong in projections, not events.
Best practices from production:
- Implement a dedicated projection worker with consumer lag monitoring.
- Use structured logging with correlation IDs across command β event β projection.
- Run periodic replay tests in staging to validate projection correctness.
- Enforce event naming conventions:
EntityActioned (past tense, domain language).
- Separate infrastructure concerns: event store, message broker, and read DB should be independently scalable.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High read asymmetry (10:1 read:write) | CQRS + Denormalized Read DB | Read models optimized for specific queries reduce latency and offload primary DB | Moderate infra cost, lower query compute |
| Strict audit/compliance requirements | Event Sourcing + Append-Only Store | Immutable event log enables deterministic replay and regulatory reporting | Higher storage cost, lower audit engineering cost |
| Simple CRUD domain (user profiles, settings) | Traditional RDBMS with soft deletes | Overhead of ES/CQRS outweighs benefits for low-complexity state | Lowest dev/infra cost |
| Real-time sync across multiple clients | CQRS + WebSocket/SSE projections | Read models push updates asynchronously without blocking commands | Moderate infra, high UX value |
| Multi-tenant SaaS with complex billing | Event Sourcing + Saga orchestration | Financial state requires auditability and distributed transaction compensation | High initial dev cost, low compliance risk |
Configuration Template
// infrastructure/event-bridge.config.ts
export const EventBridgeConfig = {
eventStore: {
type: 'postgresql',
connectionString: process.env.EVENT_STORE_DB_URL,
maxRetries: 3,
concurrencyCheck: 'version_based'
},
messageBroker: {
type: 'rabbitmq',
url: process.env.RABBITMQ_URL,
exchange: 'domain_events',
queuePrefix: 'projection_',
prefetchCount: 10,
deadLetterQueue: 'dlq_failed_events'
},
projections: {
orderReadModel: {
targetDb: 'read_replica',
batchSize: 50,
checkpointTable: 'projection_checkpoints',
retryDelayMs: 1000,
maxRetryAttempts: 5
}
},
idempotency: {
commandTable: 'processed_commands',
eventTable: 'processed_events',
ttlHours: 72
}
};
Quick Start Guide
- Initialize local infrastructure: Run
docker compose up -d with PostgreSQL (event store), PostgreSQL (read model), and RabbitMQ. Use the provided EventBridgeConfig with local connection strings.
- Scaffold the domain: Create the aggregate root, event types, and command handler using the TypeScript examples. Ensure
version tracking and loadFromHistory are implemented.
- Deploy the projection worker: Start a separate Node.js process that subscribes to the
domain_events exchange, reads events in batches, applies the OrderReadModelProjection, and updates projection_checkpoints.
- Validate the pipeline: Send a
CreateOrder command. Verify the event is appended to the event store, the projection worker processes it asynchronously, and the read model contains the denormalized order. Check processed_commands for idempotency enforcement.
- Test replay capability: Drop the read model table. Restart the projection worker with checkpoint reset. Verify it rebuilds state deterministically from the event log without manual intervention.