types/data-contract.ts
export interface FieldRule {
field: string;
type: 'string' | 'number' | 'boolean' | 'date';
nullable?: boolean;
pattern?: string;
min?: number;
max?: number;
enum?: string[];
}
export interface DataContract {
name: string;
version: string;
fields: FieldRule[];
crossFieldRules?: CrossFieldRule[];
}
export interface CrossFieldRule {
id: string;
description: string;
condition: (record: Record<string, any>) => boolean;
severity: 'warning' | 'error' | 'critical';
}
### Step 2: Build a Declarative Rule Engine
The engine evaluates records against contracts without embedding logic in transformation code. It supports field-level validation and cross-record/business rule checks.
```typescript
// engine/validator.ts
import { DataContract, FieldRule } from '../types/data-contract';
export class DataQualityValidator {
private contract: DataContract;
constructor(contract: DataContract) {
this.contract = contract;
}
validate(record: Record<string, any>): ValidationResult {
const errors: ValidationError[] = [];
for (const rule of this.contract.fields) {
const value = record[rule.field];
if (value === undefined || value === null) {
if (!rule.nullable) {
errors.push({ field: rule.field, message: `Field ${rule.field} is required`, severity: 'error' });
}
continue;
}
if (typeof value !== rule.type) {
errors.push({ field: rule.field, message: `Expected ${rule.type}, got ${typeof value}`, severity: 'error' });
}
if (rule.pattern && typeof value === 'string' && !new RegExp(rule.pattern).test(value)) {
errors.push({ field: rule.field, message: `Value does not match pattern ${rule.pattern}`, severity: 'warning' });
}
if (rule.enum && rule.enum.length > 0 && !rule.enum.includes(String(value))) {
errors.push({ field: rule.field, message: `Value not in allowed enum: ${rule.enum.join(', ')}`, severity: 'error' });
}
}
// Cross-field validation
for (const rule of this.contract.crossFieldRules || []) {
if (!rule.condition(record)) {
errors.push({ field: rule.id, message: rule.description, severity: rule.severity });
}
}
return {
valid: errors.length === 0,
errors,
timestamp: new Date().toISOString()
};
}
}
export type ValidationResult = {
valid: boolean;
errors: ValidationError[];
timestamp: string;
};
export type ValidationError = {
field: string;
message: string;
severity: 'warning' | 'error' | 'critical';
};
Step 3: Implement Async Validation Pipeline
Validation must not block ingestion. Use an event-driven or batch-async pattern where raw data lands in a staging zone, validation runs independently, and results route to quarantine or downstream tables.
// pipeline/quality-gate.ts
import { DataQualityValidator } from '../engine/validator';
import { DataContract } from '../types/data-contract';
export class QualityGatePipeline {
private validator: DataQualityValidator;
private quarantineQueue: any[] = [];
constructor(contract: DataContract) {
this.validator = new DataQualityValidator(contract);
}
async processBatch(records: Record<string, any>[]): Promise<BatchResult> {
const passed: Record<string, any>[] = [];
const failed: Record<string, any>[] = [];
for (const record of records) {
const result = this.validator.validate(record);
if (result.valid) {
passed.push(record);
} else {
failed.push({ ...record, _dq_errors: result.errors });
if (result.errors.some(e => e.severity === 'critical')) {
this.quarantineQueue.push({ ...record, _dq_errors: result.errors });
}
}
}
return { passed, failed, quarantineCount: this.quarantineQueue.length };
}
getQuarantine(): any[] {
return [...this.quarantineQueue];
}
}
export type BatchResult = {
passed: Record<string, any>[];
failed: Record<string, any>[];
quarantineCount: number;
};
Step 4: Integrate Observability and Alerting
Validation results must emit metrics, logs, and traces. Route critical violations to PagerDuty/Slack, warnings to data dashboards, and maintain SLA tracking.
// observability/metrics-emitter.ts
export class DQMetricsEmitter {
static emitValidationMetrics(result: any, pipelineName: string) {
// Pseudocode for Prometheus/CloudWatch integration
const labels = { pipeline: pipelineName, status: result.valid ? 'pass' : 'fail' };
// Increment counters
// metrics.counter('dq_validation_total').labels(labels).inc();
// metrics.histogram('dq_validation_duration_ms').observe(duration);
if (!result.valid) {
const criticalCount = result.errors?.filter((e: any) => e.severity === 'critical').length;
if (criticalCount > 0) {
// alerting.trigger('critical_dq_violation', { pipeline: pipelineName, count: criticalCount });
}
}
}
}
Architecture Decisions and Rationale
- Declarative over Imperative: Rules live in version-controlled contracts, not pipeline code. This enables cross-team reuse, auditability, and zero-downtime rule updates.
- Async Validation: Synchronous validation blocks ingestion and increases latency. Async gates allow raw data to land safely while validation runs in parallel, enabling quarantine routing without pipeline backpressure.
- Severity-Based Routing: Not all violations require halting pipelines. Warnings feed dashboards, errors trigger retries, critical violations trigger quarantine and on-call alerts. This prevents alert fatigue while preserving data integrity.
- Policy-as-Code Integration: Contracts integrate with CI/CD, enforcing validation before deployment. Schema drift is caught at merge time, not production time.
- Idempotent Checks: Validation functions are pure where possible. Stateful checks (e.g., uniqueness, referential integrity) use deterministic hashing and batch-windowed deduplication to avoid race conditions.
Pitfall Guide
-
Treating Data Quality as a Migration Task, Not a Runtime SLA
Teams often run a one-time data cleanse and assume quality is resolved. Data degrades continuously through schema changes, API updates, and upstream bugs. Quality must be enforced at ingestion and monitored continuously.
-
Hardcoding Thresholds Without Drift Detection
Static rules fail when data distributions shift. A max value that was valid last quarter may indicate a bug today. Implement statistical baselines and automated drift detection (e.g., PSI, KL divergence) to trigger rule reviews.
-
Overlapping Rule Definitions Across Teams
When multiple teams define validation for the same dataset, conflicts emerge. Centralize rule registries, enforce naming conventions, and use contract versioning to prevent duplication and contradictory checks.
-
Ignoring Data Lineage and Impact Analysis
Validating a field without knowing its downstream consumers leads to false confidence. Integrate lineage tracking so violations can be mapped to affected services, models, or reports, enabling targeted remediation.
-
Alert Fatigue from Untriaged Violations
Flooding on-call channels with every schema mismatch desensitizes teams. Implement severity tiers, aggregation windows, and auto-suppression for known maintenance periods. Route warnings to dashboards, not pages.
-
Neglecting Validation Performance Overhead
Running heavy cross-record or regex checks on every row can degrade pipeline throughput. Use sampling strategies, columnar validation for large datasets, and push compute to distributed engines (Spark, DuckDB) when scale exceeds threshold.
-
Lack of Clear Data Stewardship Ownership
Frameworks fail without accountability. Assign data owners per contract, define SLA targets, and tie quality metrics to team OKRs. Tooling without governance becomes abandoned infrastructure.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Batch ETL (Daily/Hourly) | Rule-based + Contract Validation | Predictable windows allow synchronous validation without latency impact | Low: Standard tooling, minimal infra |
| Real-time Streaming (Kafka/PubSub) | Async Framework + Sampling + Quarantine | Latency sensitivity requires non-blocking validation and burst tolerance | Medium: Stream processing overhead, quarantine storage |
| ML Feature Store | Drift Detection + Schema Enforcement | Model performance degrades silently without distribution monitoring | High: Requires statistical baselines and versioned features |
| Compliance-Heavy (HIPAA/GDPR) | Policy-as-Code + Audit Logging + Lineage | Regulatory audits require immutable validation trails and impact mapping | High: Governance tooling, audit storage, compliance reviews |
Configuration Template
# dq-contracts/v1/user_events.yaml
contract:
name: user_events
version: "1.2.0"
description: "Customer interaction events from web and mobile clients"
fields:
- field: event_id
type: string
nullable: false
pattern: "^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$"
- field: user_id
type: string
nullable: false
- field: timestamp
type: date
nullable: false
- field: event_type
type: string
nullable: false
enum: ["page_view", "click", "purchase", "signup"]
- field: revenue
type: number
nullable: true
min: 0
max: 99999.99
cross_field_rules:
- id: revenue_requires_purchase
description: "Revenue must be present when event_type is purchase"
condition: "record => record.event_type !== 'purchase' || (record.revenue != null && record.revenue > 0)"
severity: error
observability:
metrics_endpoint: "/metrics/dq"
alert_channels:
critical: ["pagerduty", "slack-dq-alerts"]
warning: ["data-dashboards"]
quarantine_bucket: "s3://dq-quarantine/user_events/"
Quick Start Guide
- Install Dependencies:
npm install zod ajv @opentelemetry/api (or use the provided TS validator engine)
- Define Contract: Create a YAML/JSON contract matching your ingestion schema. Map field types, constraints, and business rules.
- Initialize Validator: Instantiate
DataQualityValidator with the contract. Attach QualityGatePipeline to your ingestion endpoint or stream consumer.
- Deploy Async Gate: Route raw data to staging, run
processBatch(), emit metrics, and forward passed records to production tables. Quarantine failed records for review.
- Verify in CI: Add contract validation to your pipeline's pre-merge step. Fail builds if schema changes break existing rules without version bump.
Data quality frameworks are not optional infrastructure. They are the control plane for data reliability. Implement them declaratively, enforce them continuously, and measure them rigorously. The cost of adoption is measured in hours; the cost of omission is measured in downtime, compliance penalties, and eroded engineering trust.