strated by a declarative framework.
Step-by-Step Implementation
- Ingestion Layer (Bronze): Capture raw data with minimal transformation. Use Change Data Capture (CDC) for databases and idempotent API polling for external sources. Store data in a format that preserves schema history.
- Curation Layer (Silver): Clean, deduplicate, and conform data. Enforce data types, handle nulls, and apply business keys. This layer serves as the trusted dataset.
- Serving Layer (Gold): Aggregate data for specific consumption patterns. Create star schemas for BI, flattened tables for ML, or materialized views for low-latency APIs.
- Orchestration & Idempotency: Implement pipelines as idempotent functions. Every run must produce the same result regardless of execution count. Use watermarking and upsert semantics to handle late-arriving data.
Code Implementation
The following TypeScript example demonstrates a production-grade pipeline runner with retry logic, backoff, and idempotency enforcement. This pattern ensures that transient failures do not corrupt data state.
import { v4 as uuidv4 } from 'uuid';
// Types for Pipeline Configuration
interface PipelineConfig {
id: string;
source: string;
sink: string;
batchSize: number;
maxRetries: number;
idempotencyKey: string;
}
interface ExecutionResult {
success: boolean;
recordsProcessed: number;
idempotencyKey: string;
timestamp: number;
}
// Idempotency Store Interface
interface IdempotencyStore {
get(key: string): Promise<ExecutionResult | null>;
set(key: string, result: ExecutionResult): Promise<void>;
}
class PipelineRunner {
constructor(
private config: PipelineConfig,
private store: IdempotencyStore,
private transformFn: (batch: any[]) => Promise<any[]>,
private loadFn: (records: any[]) => Promise<number>
) {}
async execute(): Promise<ExecutionResult> {
// Check idempotency cache
const existing = await this.store.get(this.config.idempotencyKey);
if (existing) {
console.log(`Pipeline ${this.config.id}: Idempotent hit. Returning cached result.`);
return existing;
}
let attempts = 0;
const maxRetries = this.config.maxRetries;
while (attempts <= maxRetries) {
try {
console.log(`Pipeline ${this.config.id}: Attempt ${attempts + 1}/${maxRetries + 1}`);
// 1. Extract
const batch = await this.extractBatch();
// 2. Transform
const transformed = await this.transformFn(batch);
// 3. Load with Upsert semantics
const count = await this.loadFn(transformed);
const result: ExecutionResult = {
success: true,
recordsProcessed: count,
idempotencyKey: this.config.idempotencyKey,
timestamp: Date.now(),
};
// Persist success state
await this.store.set(this.config.idempotencyKey, result);
return result;
} catch (error) {
attempts++;
if (attempts > maxRetries) {
console.error(`Pipeline ${this.config.id}: Max retries exceeded.`);
throw error;
}
// Exponential backoff
const delay = Math.pow(2, attempts) * 1000;
console.warn(`Pipeline ${this.config.id}: Error. Retrying in ${delay}ms...`);
await new Promise(res => setTimeout(res, delay));
}
}
throw new Error("Pipeline execution failed after retries.");
}
private async extractBatch(): Promise<any[]> {
// Implementation specific to source (CDC, API, File)
// Must support pagination and offset tracking
return [];
}
}
// Usage Example
const config: PipelineConfig = {
id: 'orders-ingest-v1',
source: 'postgres.orders',
sink: 'warehouse.orders_silver',
batchSize: 5000,
maxRetries: 3,
idempotencyKey: `orders-ingest-${new Date().toISOString().split('T')[0]}`,
};
// The idempotency key ensures that reruns on the same day do not duplicate data.
SQL Transformation (Silver Layer)
Transformations should be declarative SQL to leverage the compute engine's optimization. The following pattern handles deduplication and schema evolution using MERGE statements, which are essential for idempotency in the Silver layer.
-- Silver Layer Transformation: Deduplication and Type Enforcement
-- Uses MERGE to ensure idempotent upserts
MERGE INTO silver.orders AS target
USING (
SELECT
order_id,
customer_id,
order_date,
total_amount,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY _ingest_timestamp DESC) as rn
FROM bronze.orders
WHERE _ingest_timestamp >= :watermark_start
AND _ingest_timestamp < :watermark_end
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN
UPDATE SET
customer_id = source.customer_id,
order_date = source.order_date,
total_amount = source.total_amount,
updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, order_date, total_amount, created_at)
VALUES (source.order_id, source.customer_id, source.order_date, source.total_amount, CURRENT_TIMESTAMP);
Architecture Decisions
- Transactional Storage: Use Delta Lake or Apache Iceberg over raw object storage. These formats provide ACID transactions, schema enforcement, and time travel, which are non-negotiable for reliable pipelines.
- Idempotency via Business Keys: Rely on natural business keys (e.g.,
order_id) for deduplication rather than ingestion timestamps. Ingestion timestamps can be duplicated in high-throughput scenarios; business keys provide a deterministic merge point.
- Watermarking: Implement watermarking to handle late-arriving data. Define a threshold (e.g., 24 hours) after which late data is dropped or routed to a dead-letter queue, preventing unbounded state growth in streaming contexts.
Pitfall Guide
-
Ignoring Idempotency:
- Mistake: Pipelines append data on every run. Reruns due to failure cause duplicates.
- Correction: Every pipeline must be idempotent. Use
MERGE/UPSERT logic or partition overwrites keyed by execution date. Validate that COUNT(*) remains constant across multiple runs of the same logical window.
-
Schema Drift Blindness:
- Mistake: Upstream schema changes (column drops, type changes) break downstream pipelines silently or crash them.
- Correction: Implement schema registry and contract testing. Use storage formats that support schema evolution (e.g.,
mergeSchema: true in Delta) but enforce strict contracts on the Silver layer. Alert on schema changes immediately.
-
Lack of Data Quality Gates:
- Mistake: Pipelines move data without validating it. Garbage in propagates to Gold, destroying trust.
- Correction: Insert Data Quality (DQ) checks between layers. Check for nulls in primary keys, value ranges, and row count anomalies. If DQ checks fail, halt the pipeline and alert; do not propagate bad data.
-
Over-Engineering Streaming:
- Mistake: Implementing Kafka/Flink for use cases that only require hourly updates.
- Correction: Evaluate latency requirements strictly. If the business can tolerate 15-minute latency, micro-batch is superior in cost and simplicity. Reserve pure streaming for sub-second requirements like fraud detection.
-
Monolithic Pipeline Logic:
- Mistake: A single pipeline handles extraction, complex transformation, and loading for multiple tables.
- Correction: Decompose pipelines by domain or table. Small, focused pipelines fail independently and are easier to debug, test, and scale. Use orchestration tools to manage dependencies between pipelines.
-
No Backpressure Handling:
- Mistake: Ingestion rate exceeds processing rate, causing memory overflow or sink failures.
- Correction: Implement backpressure mechanisms. In batch, use rate limiting. In streaming, configure consumer lag thresholds and auto-scaling policies. Monitor sink throughput and alert on saturation.
-
Treating Pipelines as Scripts:
- Mistake: No version control, no unit tests, no CI/CD for pipeline code.
- Correction: Pipeline code is production code. Enforce code reviews, unit tests for transformation logic, integration tests against mock data, and automated deployment via CI/CD pipelines.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| T+1 Reporting & Compliance | Batch (Daily) | Low latency requirements; maximizes compute efficiency via burst processing. | Low (Optimized for throughput) |
| Operational Dashboards (15m lag) | Micro-Batch Lakehouse | Balances latency with operational simplicity; unified architecture reduces maintenance. | Medium (Efficient streaming compute) |
| Fraud Detection (<1s) | Pure Streaming | Sub-second latency required; stateful processing for pattern detection. | High (Always-on resources) |
| Ad-hoc Analysis on Raw Data | Data Lake + Query Engine | Schema-on-read flexibility; low cost storage for exploratory work. | Low (Storage focused) |
| ML Feature Store | Lakehouse with Feature Table | Supports both batch training and real-time inference; versioning capabilities. | Medium (High availability storage) |
Configuration Template
Use this TypeScript configuration template to define pipelines declaratively, enabling infrastructure-as-code practices.
// pipeline.config.ts
import { PipelineConfig } from './types';
export const pipelines: PipelineConfig[] = [
{
id: 'customers-ingest',
type: 'CDC',
source: {
connection: 'postgres-primary',
table: 'public.customers',
cdcMode: 'log_based',
},
sink: {
path: 's3://warehouse/raw/customers',
format: 'parquet',
partitionBy: ['_ingest_date'],
},
qualityChecks: [
{ field: 'customer_id', rule: 'not_null' },
{ field: 'email', rule: 'unique' },
],
schedule: { cron: '*/5 * * * *' }, // Micro-batch every 5 mins
retries: 3,
backoff: 'exponential',
},
{
id: 'orders-transform-silver',
type: 'SQL',
source: {
path: 's3://warehouse/raw/orders',
format: 'delta',
},
sink: {
path: 's3://warehouse/silver/orders',
format: 'delta',
},
transformation: 'sql/transforms/orders_silver.sql',
watermark: '24h',
schedule: { cron: '0 */2 * * *' }, // Every 2 hours
retries: 2,
backoff: 'linear',
},
];
Quick Start Guide
-
Initialize Local Stack:
Create a docker-compose.yml with MinIO (S3 compatible), Postgres, and a compute engine (e.g., Spark or DuckDB). Run docker-compose up -d.
-
Define Pipeline:
Copy the configuration template and define a simple CDC pipeline from Postgres to MinIO. Set up the schema in Postgres and create the target bucket in MinIO.
-
Execute Ingestion:
Run the pipeline runner script. Insert test records into Postgres. Verify that Parquet files appear in MinIO with the correct partition structure.
-
Validate Idempotency:
Rerun the pipeline twice. Query the sink and confirm that record counts match the source, with no duplicates.
-
Add Quality Check:
Insert a record with a null primary key. Run the pipeline and verify that the record is rejected or routed to the DLQ based on your DQ configuration. Confirm the pipeline logs the validation failure.