uld be field-centric only.
*/
export function generateDiscrepancyFingerprint(ctx: DiscrepancyContext): string {
const payload = {
entity: ctx.entityId,
field: ctx.fieldName,
// Sort keys to ensure deterministic serialization
values: [String(ctx.sourceValue), String(ctx.targetValue)].sort(),
};
const canonical = JSON.stringify(payload, Object.keys(payload).sort());
return createHash('sha256').update(canonical).digest('hex').substring(0, 24);
}
#### 2. State Machine and Lifecycle
The discrepancy tracker manages four primary states:
* **OPEN:** A new discrepancy detected. Alerts are emitted.
* **ACKNOWLEDGED:** An operator has reviewed the discrepancy. No alerts are emitted on re-detection.
* **RESOLVED:** The discrepancy is no longer present in the current run.
* **ESCALATED:** An OPEN discrepancy has persisted beyond a defined threshold.
The engine processes findings by checking the current state and updating accordingly.
```typescript
enum DiscrepancyStatus {
OPEN = 'OPEN',
ACKNOWLEDGED = 'ACKNOWLEDGED',
RESOLVED = 'RESOLVED',
ESCALATED = 'ESCALATED',
}
interface DiscrepancyRecord {
fingerprint: string;
status: DiscrepancyStatus;
lastSeenAt: Date;
createdAt: Date;
scopeTag: string; // Tracks the comparison scope for resolution safety
}
interface DiscrepancyStore {
upsert(record: DiscrepancyRecord): Promise<void>;
findByFingerprint(fp: string): Promise<DiscrepancyRecord | null>;
findByStatus(status: DiscrepancyStatus, scopeTag?: string): Promise<DiscrepancyRecord[]>;
}
interface AlertService {
notify(record: DiscrepancyRecord): Promise<void>;
}
export class ReconciliationEngine {
constructor(
private store: DiscrepancyStore,
private alerts: AlertService,
private escalationThresholdMs: number
) {}
async processFinding(
fingerprint: string,
scopeTag: string,
sourceValue: unknown,
targetValue: unknown
): Promise<void> {
const existing = await this.store.findByFingerprint(fingerprint);
if (!existing) {
// New discrepancy
const record: DiscrepancyRecord = {
fingerprint,
status: DiscrepancyStatus.OPEN,
lastSeenAt: new Date(),
createdAt: new Date(),
scopeTag,
};
await this.store.upsert(record);
await this.alerts.notify(record);
} else if (existing.status === DiscrepancyStatus.OPEN) {
// Re-detected open discrepancy
await this.store.upsert({
...existing,
lastSeenAt: new Date(),
});
// Check for escalation
const age = Date.now() - existing.createdAt.getTime();
if (age > this.escalationThresholdMs) {
await this.store.upsert({
...existing,
status: DiscrepancyStatus.ESCALATED,
});
await this.alerts.notify({ ...existing, status: DiscrepancyStatus.ESCALATED });
}
} else if (existing.status === DiscrepancyStatus.ACKNOWLEDGED) {
// Known issue, no action required
// Optionally update lastSeen for audit purposes
await this.store.upsert({
...existing,
lastSeenAt: new Date(),
});
}
}
async resolveClearedDiscrepancies(currentRunFingerprints: Set<string>, scopeTag: string): Promise<void> {
// Only resolve discrepancies within the current scope to avoid false resolutions
// during incremental runs
const openDiscrepancies = await this.store.findByStatus(DiscrepancyStatus.OPEN, scopeTag);
for (const disc of openDiscrepancies) {
if (!currentRunFingerprints.has(disc.fingerprint)) {
await this.store.upsert({
...disc,
status: DiscrepancyStatus.RESOLVED,
});
}
}
}
}
3. Architecture Decisions
- TypeScript Implementation: Strong typing enforces strict contracts for state transitions and reduces runtime errors in complex reconciliation logic.
- Scope Tagging: Every discrepancy is tagged with the comparison scope (e.g., date range, partition ID). Resolution logic only considers discrepancies within the current scope. This prevents incremental runs from incorrectly marking discrepancies as resolved simply because they were not included in the current batch.
- Upsert Pattern: The store uses upsert semantics to handle race conditions and ensure idempotency. If a record exists, it is updated; if not, it is created. This avoids duplicate key errors and ensures consistent state.
- Separation of Concerns: The engine handles logic, the store handles persistence, and the alert service handles notifications. This allows swapping storage backends (SQL, NoSQL, Redis) or alert channels without modifying core logic.
Pitfall Guide
Production reconciliation systems encounter specific failure modes that can undermine idempotency and data integrity.
-
Floating-Point Equality Errors
- Explanation: Direct comparison of floating-point numbers often fails due to precision differences between systems.
0.1 + 0.2 may not equal 0.3 exactly, causing spurious discrepancies.
- Fix: Implement epsilon-based comparison for numeric fields. Define a tolerance threshold (e.g.,
Math.abs(a - b) < 1e-9) and apply it consistently.
-
Temporal Drift and Timezone Ambiguity
- Explanation: Timestamps may differ due to timezone offsets, subsecond precision, or formatting variations. Comparing
2023-10-01T00:00:00Z with 2023-10-01T00:00:00.000Z can trigger false positives.
- Fix: Normalize all timestamps to UTC and strip subsecond precision before comparison. Use a canonical format or epoch integer for hashing.
-
Partial Scope Resolution Fallacy
- Explanation: In incremental reconciliation, if a run only processes a subset of records, discrepancies for unprocessed records may be incorrectly marked as resolved.
- Fix: Tag discrepancies with scope metadata. Resolution logic must filter by scope, ensuring only discrepancies covered by the current run are evaluated for resolution.
-
Non-Deterministic Hashing
- Explanation: If the fingerprint generation relies on object serialization without sorting keys, the same discrepancy may produce different hashes across runs.
- Fix: Always sort object keys before serialization. Use a deterministic hashing algorithm and verify output consistency with unit tests.
-
Race Conditions in State Updates
- Explanation: Concurrent reconciliation runs may attempt to update the same discrepancy simultaneously, leading to lost updates or inconsistent states.
- Fix: Use atomic upsert operations in the storage layer. Implement distributed locks for critical sections if necessary. Ensure the state machine is idempotent even under concurrent access.
-
Acknowledgment Black Hole
- Explanation: Operators may acknowledge discrepancies without documenting the rationale, leading to permanent suppression of valid alerts.
- Fix: Require metadata (reason, reviewer) for acknowledgments. Implement expiration policies for acknowledgments to force periodic review. Maintain an audit trail of all state transitions.
-
Null Handling Inconsistency
- Explanation: Treating
null vs undefined or missing fields differently across comparison logic can cause non-deterministic results.
- Fix: Define explicit null-handling rules. Treat missing fields and null values consistently. Normalize nulls to a canonical representation before comparison.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| High Volume, Low Latency | Incremental Runs + Redis Cache | Reduces processing time; cache speeds up fingerprint lookups. | Higher infrastructure cost for Redis; complex scope management. |
| Compliance Audit Requirements | Full Runs + SQL Ledger | Ensures complete traceability; SQL supports complex queries and audit trails. | Higher compute cost per run; slower execution. |
| Mixed Criticality Fields | Tiered Alerting + Ack Expiration | Critical fields trigger immediate alerts; non-critical fields use ack expiration. | Moderate complexity; improves signal quality. |
| Multi-Region Data | Distributed State Store + Scope Tagging | Handles regional partitions; scope tagging prevents cross-region resolution errors. | Higher storage cost; network latency considerations. |
Configuration Template
interface ReconciliationConfig {
// Comparison settings
comparison: {
floatTolerance: number;
timestampNormalization: 'UTC' | 'EPOCH';
nullHandling: 'TREAT_AS_EQUAL' | 'TREAT_AS_DIFFERENCE';
};
// State management
state: {
storeType: 'SQL' | 'REDIS' | 'DYNAMODB';
escalationThresholdMs: number;
ackExpirationDays: number;
scopeStrategy: 'FULL' | 'INCREMENTAL_DATE' | 'INCREMENTAL_ID';
};
// Alerting
alerting: {
channels: string[];
deduplicationWindowMs: number;
escalationChannels: string[];
};
// Monitoring
monitoring: {
metricsPrefix: string;
logLevel: 'DEBUG' | 'INFO' | 'WARN' | 'ERROR';
};
}
export const defaultConfig: ReconciliationConfig = {
comparison: {
floatTolerance: 1e-9,
timestampNormalization: 'UTC',
nullHandling: 'TREAT_AS_EQUAL',
},
state: {
storeType: 'SQL',
escalationThresholdMs: 86400000, // 24 hours
ackExpirationDays: 30,
scopeStrategy: 'INCREMENTAL_DATE',
},
alerting: {
channels: ['slack-data-quality'],
deduplicationWindowMs: 3600000, // 1 hour
escalationChannels: ['pagerduty-critical'],
},
monitoring: {
metricsPrefix: 'reconciliation',
logLevel: 'INFO',
},
};
Quick Start Guide
- Define Schema: Create the discrepancy table with fields for fingerprint, status, timestamps, and scope tag. Add indexes on fingerprint and status.
- Implement Fingerprinting: Write the deterministic fingerprint function. Unit test with various input combinations to ensure stability.
- Wire State Store: Implement the
DiscrepancyStore interface using your chosen backend. Ensure upsert logic is atomic.
- Deploy Dry Run: Execute the engine in dry-run mode to validate comparison logic and fingerprint generation without emitting alerts.
- Enable Production: Switch to live mode. Monitor initial runs for alert volume and resolution accuracy. Adjust thresholds and scope strategies as needed.