lity.
# 01_bronze_ingestion.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit
spark = SparkSession.builder.getOrCreate()
RAW_SOURCE = "/databricks-datasets/online_retail/data-001/data.csv"
BRONZE_PATH = "/Volumes/warehouse/raw_ecommerce/transactions_raw"
# Read raw CSV with explicit schema enforcement
raw_df = spark.read.option("header", "true").csv(RAW_SOURCE)
# Attach ingestion metadata for lineage tracking
bronze_df = (
raw_df
.withColumn("ingest_timestamp", current_timestamp())
.withColumn("source_path", input_file_name())
.withColumn("system_origin", lit("uk_retail_csv"))
)
# Write as Delta with schema evolution enabled
bronze_df.write.format("delta").mode("append").option("mergeSchema", "true").save(BRONZE_PATH)
# Register in catalog for SQL access
spark.sql(f"""
CREATE TABLE IF NOT EXISTS raw_ecommerce.transactions_raw
USING DELTA
LOCATION '{BRONZE_PATH}'
""")
print(f"Bronze ingestion complete. Rows: {bronze_df.count():,}")
Architecture Rationale:
mode("append") preserves historical snapshots. Overwriting Bronze destroys audit trails.
mergeSchema allows safe schema evolution without pipeline failures.
- Metadata columns (
ingest_timestamp, source_path) enable data lineage and debugging without altering source records.
Silver transforms raw data into a trusted, query-ready format. This layer enforces business rules, handles nulls, removes duplicates, and standardizes naming conventions.
# 02_silver_conformance.py
from pyspark.sql.functions import (
col, to_timestamp, round, year, month, when,
trim, upper, current_timestamp
)
BRONZE_TABLE = "raw_ecommerce.transactions_raw"
SILVER_PATH = "/Volumes/warehouse/curated_ecommerce/transactions_clean"
raw = spark.table(BRONZE_TABLE)
# Define conformance rules
conformed = (
raw
# 1. Remove anonymous/invalid sessions
.filter(col("CustomerID").isNotNull())
# 2. Deduplicate on transaction + product key
.dropDuplicates(["InvoiceNo", "StockCode"])
# 3. Filter out system returns and pricing errors
.filter((col("Quantity") > 0) & (col("UnitPrice") > 0))
# 4. Standardize types and formats
.withColumn("customer_id", col("CustomerID").cast("int"))
.withColumn("txn_date", to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm"))
.withColumn("unit_price", round(col("UnitPrice"), 2))
# 5. Derive business metrics
.withColumn("line_total", round(col("Quantity") * col("UnitPrice"), 2))
.withColumn("product_name", upper(trim(col("Description"))))
.withColumn("txn_year", year(col("txn_date")))
.withColumn("txn_month", month(col("txn_date")))
.withColumn("spend_tier",
when(col("line_total") >= 500, "Premium")
.when(col("line_total") >= 100, "Standard")
.otherwise("Entry")
)
# 6. Rename to consistent snake_case
.withColumnRenamed("InvoiceNo", "txn_id")
.withColumnRenamed("StockCode", "sku")
.withColumnRenamed("Country", "region")
# 7. Drop raw metadata, add processing timestamp
.drop("ingest_timestamp", "source_path", "system_origin")
.withColumn("processed_at", current_timestamp())
)
# Write partitioned by temporal keys for efficient filtering
conformed.write.format("delta").mode("overwrite").partitionBy("txn_year", "txn_month").save(SILVER_PATH)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS curated_ecommerce.transactions_clean
USING DELTA
LOCATION '{SILVER_PATH}'
""")
print(f"Silver conformance complete. Rows: {conformed.count():,}")
Architecture Rationale:
- Partitioning by
txn_year and txn_month aligns with common BI query patterns, enabling partition pruning.
- Explicit type casting and timestamp parsing prevent silent data corruption downstream.
- Overwriting Silver is acceptable here because the source is Bronze (append-only). If streaming,
MERGE INTO would be required.
Phase 4: Gold Aggregation (Business-Ready Metrics)
Gold tables serve pre-computed aggregates optimized for reporting, dashboards, and machine learning features. Logic is isolated from transformation pipelines.
# 03_gold_aggregations.py
from pyspark.sql.functions import sum, count, avg, countDistinct, round
CLEAN_TABLE = "curated_ecommerce.transactions_clean"
GOLD_BASE = "/Volumes/warehouse/analytics_ecommerce"
clean = spark.table(CLEAN_TABLE)
# Gold 1: Regional Revenue Trends
regional_revenue = (
clean.groupBy("txn_year", "txn_month", "region")
.agg(
round(sum("line_total"), 2).alias("gross_revenue"),
count("txn_id").alias("order_count"),
round(avg("line_total"), 2).alias("avg_basket_size"),
countDistinct("customer_id").alias("active_shoppers")
)
.orderBy("txn_year", "txn_month", "gross_revenue", ascending=[True, True, False])
)
regional_revenue.write.format("delta").mode("overwrite").save(f"{GOLD_BASE}/regional_revenue")
spark.sql(f"CREATE TABLE IF NOT EXISTS analytics_ecommerce.regional_revenue USING DELTA LOCATION '{GOLD_BASE}/regional_revenue'")
# Gold 2: Product Velocity & Revenue
product_metrics = (
clean.groupBy("sku", "product_name")
.agg(
round(sum("line_total"), 2).alias("total_revenue"),
sum("Quantity").alias("units_moved"),
count("txn_id").alias("purchase_frequency"),
countDistinct("customer_id").alias("buyer_reach"),
round(avg("unit_price"), 2).alias("realized_price")
)
.orderBy("total_revenue", ascending=False)
)
product_metrics.write.format("delta").mode("overwrite").save(f"{GOLD_BASE}/product_metrics")
spark.sql(f"CREATE TABLE IF NOT EXISTS analytics_ecommerce.product_metrics USING DELTA LOCATION '{GOLD_BASE}/product_metrics'")
# Gold 3: Customer Lifetime Value Segments
customer_ltv = (
clean.groupBy("customer_id", "region")
.agg(
round(sum("line_total"), 2).alias("ltv"),
count("txn_id").alias("total_orders"),
round(avg("line_total"), 2).alias("avg_order_val"),
countDistinct("sku").alias("category_breadth")
)
.withColumn("segment",
when(col("ltv") >= 5000, "VIP")
.when(col("ltv") >= 1000, "Core")
.when(col("ltv") >= 200, "Active")
.otherwise("Dormant")
)
.orderBy("ltv", ascending=False)
)
customer_ltv.write.format("delta").mode("overwrite").save(f"{GOLD_BASE}/customer_ltv")
spark.sql(f"CREATE TABLE IF NOT EXISTS analytics_ecommerce.customer_ltv USING DELTA LOCATION '{GOLD_BASE}/customer_ltv'")
print("Gold aggregation pipeline complete.")
Architecture Rationale:
- Gold tables are denormalized for query performance. Joins are pre-computed.
- Separating business logic into Gold allows BI teams to query directly without understanding transformation pipelines.
- Delta's
OVERWRITE is safe here because Gold is fully recomputed from Silver on each run.
Pitfall Guide
| Pitfall | Explanation | Production Fix |
|---|
| Overwriting Bronze | Replacing raw data destroys audit trails and makes debugging impossible. | Always use mode("append") or MERGE INTO for Bronze. Keep raw files immutable. |
| Schema Drift in Silver | Source systems add/remove columns without warning, breaking pipelines. | Enable mergeSchema in Delta writes. Implement schema validation checks before Silver transformation. |
| Partition Explosion | Partitioning by high-cardinality columns (e.g., customer_id) creates millions of small files. | Partition only by low-cardinality temporal or categorical keys (year, month, region). Use Z-ORDER for filtering. |
| Hardcoded Storage Paths | Tying pipelines to absolute paths breaks across environments (dev/stage/prod). | Use Databricks Volumes or Unity Catalog managed tables. Parameterize paths via config files or environment variables. |
| Ignoring Data Skew | Aggregating on skewed keys (e.g., popular products) causes executor OOM errors. | Apply salting techniques, increase spark.sql.shuffle.partitions, or use skewJoin hints. Monitor Spark UI for task imbalance. |
| Mixing Layer Logic | Applying business rules in Bronze or raw filtering in Gold creates maintenance debt. | Enforce strict layer contracts: Bronze = raw, Silver = conformed, Gold = aggregated. Use code reviews to validate boundaries. |
| Skipping Time Travel | Failing to leverage Delta's versioning makes rollback impossible after bad deployments. | Query DESCRIBE HISTORY table_name before overwrites. Implement automated snapshot retention policies. |
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Batch historical load | Full overwrite per layer | Simpler orchestration, predictable compute | Low (one-time) |
| Near-real-time streaming | MERGE INTO with watermarking | Handles late arrivals, maintains ACID guarantees | Medium (continuous compute) |
| High-cardinality filtering | Z-ORDER on Silver tables | Improves read performance without partition overhead | Low (storage cost) |
| Multi-tenant analytics | Unity Catalog schema isolation | Enforces row-level security and audit trails | Medium (license cost) |
| Cost-constrained environment | Partition by month, aggregate daily | Reduces scan volume, optimizes cache hits | Low (compute savings) |
Configuration Template
-- Delta Table Optimization & Retention Policy
ALTER TABLE curated_ecommerce.transactions_clean SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'delta.logRetentionDuration' = 'interval 30 days',
'delta.deletedRetentionDuration' = 'interval 7 days',
'delta.appendOnly' = 'false'
);
-- Enable Auto Compaction for Gold Tables
ALTER TABLE analytics_ecommerce.regional_revenue SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);
Quick Start Guide
- Provision Storage & Catalog: Create three schemas (
raw_ecommerce, curated_ecommerce, analytics_ecommerce) and mount a shared volume for Delta tables.
- Run Bronze Ingestion: Execute the raw CSV ingestion notebook. Verify row counts and metadata columns. Confirm append-only behavior.
- Execute Silver Conformance: Run the transformation notebook. Validate null removal, deduplication, and partition layout. Check
txn_year/txn_month distribution.
- Generate Gold Aggregates: Execute the aggregation notebook. Query Gold tables via SQL or BI connector. Verify metric consistency against Silver.
- Schedule & Monitor: Wrap notebooks in a Databricks Workflow. Configure email alerts on row count deltas. Schedule weekly
OPTIMIZE and monthly VACUUM jobs.
This architecture transforms raw transactional noise into reliable, query-optimized analytics. By enforcing layer boundaries, leveraging Delta Lake's transactional guarantees, and isolating business logic, teams can scale data products without accumulating technical debt.