/Spark), this requires AST (Abstract Syntax Tree) analysis or runtime hooks.
2. Parsing and Extraction
Do not rely on execution logs. Logs are unstructured and lossy. Use a SQL parser to extract input/output tables and column mappings. For complex logic, map expressions to lineage edges.
3. Graph Construction
Store lineage as a directed acyclic graph (DAG). Nodes represent datasets and columns; edges represent data flow and transformation types.
4. Temporal Versioning
Lineage changes as schemas evolve. Each lineage snapshot must be versioned with a timestamp or run ID to support historical analysis.
Code Examples (TypeScript)
A. SQL Lineage Extractor
This module uses a SQL parser to extract lineage from DML statements. It maps source columns to target columns and classifies transformations.
import { parse, Select, Insert, Update } from 'node-sql-parser';
export interface LineageEdge {
sourceNodeId: string;
targetNodeId: string;
sourceColumn: string;
targetColumn: string;
transformation: 'copy' | 'expression' | 'aggregate' | 'join';
timestamp: number;
}
export class SQLLineageExtractor {
extract(sql: string, runId: string): LineageEdge[] {
const ast = parse(sql);
const edges: LineageEdge[] = [];
const timestamp = Date.now();
if (ast.type === 'select') {
const sources = this.extractSources(ast);
const targets = this.extractTargets(ast);
// Simplified mapping logic; production requires full expression analysis
sources.forEach(src => {
targets.forEach(tgt => {
edges.push({
sourceNodeId: `${src.schema}.${src.table}`,
targetNodeId: `${tgt.schema}.${tgt.table}`,
sourceColumn: src.column,
targetColumn: tgt.column,
transformation: this.detectTransformation(src, tgt, ast),
timestamp
});
});
});
}
return edges;
}
private extractSources(ast: Select): Array<{schema: string, table: string, column: string}> {
// Implementation details for parsing FROM/JOIN clauses
// Returns array of source references
return [];
}
private extractTargets(ast: Select): Array<{schema: string, table: string, column: string}> {
// Implementation details for parsing SELECT columns
return [];
}
private detectTransformation(src: any, tgt: any, ast: Select): LineageEdge['transformation'] {
if (ast.where) return 'expression';
if (ast.groupby) return 'aggregate';
return 'copy';
}
}
B. Graph Storage Service
Using a graph database (e.g., Neo4j, Amazon Neptune) is essential for efficient upstream/downstream traversal. The following TypeScript interface defines the storage contract.
import { Driver } from 'neo4j-driver';
export interface GraphLineageService {
upsertNode(nodeId: string, type: 'dataset' | 'column', metadata: Record<string, any>): Promise<void>;
upsertEdge(sourceId: string, targetId: string, edgeType: 'DERIVED_FROM' | 'DEPENDS_ON', properties: Record<string, any>): Promise<void>;
getUpstream(nodeId: string, depth: number): Promise<Array<{id: string, type: string}>>;
getDownstreamImpact(nodeId: string): Promise<Array<{id: string, type: string}>>;
}
export class Neo4jLineageService implements GraphLineageService {
constructor(private driver: Driver) {}
async upsertNode(nodeId: string, type: string, metadata: Record<string, any>): Promise<void> {
const session = this.driver.session();
try {
await session.run(
`MERGE (n:${type} {id: $id})
SET n += $metadata, n.updatedAt = datetime()`,
{ id: nodeId, metadata }
);
} finally {
await session.close();
}
}
async upsertEdge(sourceId: string, targetId: string, edgeType: string, properties: Record<string, any>): Promise<void> {
const session = this.driver.session();
try {
await session.run(
`MATCH (s {id: $sourceId}), (t {id: $targetId})
MERGE (s)-[r:${edgeType}]->(t)
SET r += $properties`,
{ sourceId, targetId, properties }
);
} finally {
await session.close();
}
}
async getUpstream(nodeId: string, depth: number): Promise<any[]> {
const session = this.driver.session();
try {
const result = await session.run(
`MATCH path = (n {id: $nodeId})<-[*1..${depth}]-(upstream)
RETURN DISTINCT upstream.id as id, labels(upstream)[0] as type`,
{ nodeId }
);
return result.records.map(r => r.toObject());
} finally {
await session.close();
}
}
// Implementation for getDownstreamImpact omitted for brevity
}
C. Architecture Decisions
- Async Ingestion: Lineage collection must never block pipeline execution. Use a message queue (Kafka/SQS) to decouple extraction from storage. Pipelines emit lineage events; a consumer service persists them to the graph.
- OpenLineage Standard: Adopt the OpenLineage spec for event payloads. This ensures interoperability with tools like Airflow, Spark, dbt, and Flink, reducing vendor lock-in.
- Hybrid Storage: Store high-frequency lineage events in a time-series store for auditing and the graph DB for relationship traversal. This optimizes cost and query performance.
Pitfall Guide
Production lineage systems fail due to specific architectural and operational mistakes. Avoid these pitfalls to ensure reliability.
1. The Dynamic SQL Trap
- Mistake: Relying on static analysis for pipelines that construct SQL dynamically via string concatenation or template engines.
- Impact: Lineage gaps for critical transformations. The parser sees a variable, not the query structure.
- Best Practice: Instrument the runtime execution layer. Capture the actual SQL text executed against the database, or use runtime hooks in the execution engine (e.g., Spark listeners) to extract lineage from the logical plan.
2. Ignoring Schema Evolution
- Mistake: Treating lineage as immutable. When a column is renamed or dropped, existing lineage edges become orphaned or incorrect.
- Impact: False positives in impact analysis. Engineers trust the lineage less over time.
- Best Practice: Implement schema change detection. When a DDL event occurs, update the graph nodes and edges. Use unique column identifiers (UUIDs) rather than names to maintain lineage continuity across renames.
3. Over-Granularity vs. Under-Granularity
- Mistake: Tracking row-level lineage by default or stopping at dataset-level.
- Impact: Row-level creates unmanageable graph size and storage costs. Dataset-level provides no debugging value for column errors.
- Best Practice: Default to column-level granularity. Enable row-level lineage only for specific high-value assets or compliance requirements via sampling or watermarking.
4. Lack of Temporal Context
- Mistake: Storing only the current state of lineage.
- Impact: Unable to debug historical incidents. If a bug existed for three months, you cannot trace data back to its state during that period.
- Best Practice: Version lineage snapshots. Associate lineage edges with
valid_from and valid_to timestamps. Query lineage based on the timestamp of the data run.
5. Business Logic Blindness
- Mistake: Assuming technical lineage equals business understanding. SQL
price * quantity is technically lineage, but the business meaning "Revenue" is lost.
- Impact: Compliance audits fail; business users cannot use the lineage tool.
- Best Practice: Enrich technical lineage with semantic tags. Integrate with the data catalog to map technical columns to business glossary terms. Store transformation descriptions in metadata.
6. Performance Degradation of Graph DB
- Mistake: Writing lineage synchronously within the critical path or creating massive graphs without indexing.
- Impact: Pipeline slowdowns or slow impact analysis queries.
- Best Practice: Use async ingestion. Implement composite indexes on node IDs and edge types. Partition the graph by domain or date range if scale exceeds single-node capacity.
7. Single Point of Failure
- Mistake: Centralizing lineage storage in the same region/cluster as operational data.
- Impact: If the operational region fails, lineage is unavailable for disaster recovery analysis.
- Best Practice: Replicate lineage metadata to a secondary region. Treat lineage as critical infrastructure with its own SLA.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Startup / MVP | Dataset Lineage + dbt docs | Low overhead; sufficient for early stage debugging. | Low |
| Enterprise / Compliance | Column Lineage + OpenLineage + Graph DB | Required for GDPR/CCPA; enables precise impact analysis. | High (Storage/Compute) |
| Real-time Streaming | Runtime Hooks + Kafka | Static parsing fails on streaming; runtime capture ensures accuracy. | Medium |
| Multi-Cloud Hybrid | Agnostic Graph + Event Bus | Decouples lineage from cloud providers; ensures portability. | Medium |
| Legacy Monolith | SQL Parser Middleware | Non-intrusive injection; captures lineage without refactoring code. | Low |
Configuration Template
OpenLineage Agent Configuration (YAML)
Use this template to configure the OpenLineage agent for a Spark job, ensuring column-level extraction.
openlineage:
transport:
type: http
url: https://lineage-api.internal.company.com/api/v1/lineage
facet:
column-lineage:
enabled: true
sql-parser: true
ownership:
enabled: true
default-owner: data-engineering-team
integration:
spark:
sparkListener: true
extraProperties:
spark.openlineage.debug.enabled: false
spark.openlineage.facets.sql.columnLineage.enabled: true
Graph Schema Definition (Cypher)
Initial schema setup for Neo4j to support lineage traversal and versioning.
// Create constraints for uniqueness
CREATE CONSTRAINT dataset_id_unique IF NOT EXISTS
FOR (d:Dataset) REQUIRE d.id IS UNIQUE;
CREATE CONSTRAINT column_id_unique IF NOT EXISTS
FOR (c:Column) REQUIRE c.id IS UNIQUE;
// Create indexes for performance
CREATE INDEX column_name_idx IF NOT EXISTS FOR (c:Column) ON (c.name);
CREATE INDEX edge_run_id_idx IF NOT EXISTS FOR ()-[r:DERIVED_FROM]-() ON (r.runId);
// Node labels
// :Dataset {id, name, type, location, updatedAt}
// :Column {id, name, type, dataType, description, updatedAt}
// Edge labels
// :DERIVED_FROM {sourceColumnId, targetColumnId, transformation, runId, validFrom, validTo}
Quick Start Guide
- Install the Collector: Deploy the OpenLineage agent to your pipeline execution environment. Configure the transport URL to point to your ingestion service.
# Example for Python environment
pip install openlineage-python openlineage-airflow
export OPENLINEAGE_URL=https://lineage-api.internal.company.com
- Configure Storage: Initialize your graph database with the schema constraints and indexes. Ensure the ingestion service has write permissions.
- Run a Test Pipeline: Execute a simple ETL job with lineage enabled. Verify that events are emitted and persisted in the graph.
-- Test Query
INSERT INTO target_table (col_a, col_b)
SELECT source_col_1, source_col_2 FROM source_table;
- Validate Lineage: Query the graph to confirm edges exist.
MATCH (s:Column)-[r:DERIVED_FROM]->(t:Column)
WHERE s.name = 'source_col_1' AND t.name = 'col_a'
RETURN s, r, t;
- Integrate Impact Analysis: Add the impact analysis API endpoint to your deployment dashboard. Simulate a schema change on
source_table and verify that downstream dependencies are listed correctly.
Implementing data lineage tracking is not a metadata exercise; it is an engineering discipline. By enforcing column-level granularity, decoupling ingestion, and versioning relationships, you transform lineage from a static report into a dynamic operational asset that reduces risk, accelerates debugging, and ensures compliance at scale.