E-Commerce Analytics Platform

Phase 3: Bronze Layer Implementation

Duration: Days 6-8 | 6-8 hours total
Goal: Ingest raw CSV data into Delta Lake tables with proper schema and validation


OVERVIEW

In Phase 3, you will:

Bronze Layer Philosophy: Store raw data exactly as received with minimal transformation, add metadata for tracking, preserve data lineage.


PREREQUISITES

Before starting Phase 3:


ARCHITECTURE: BRONZE LAYER

Copy to clipboard
Raw CSV Files (raw-landing) Bronze Notebooks Delta Lake Tables (bronze) [customers] [products] [orders] [order_items] [web_events]

STEP 3.1: Mount Azure Storage to Databricks (30 minutes)

You'll create a Databricks notebook to mount Azure storage containers.

Actions:

  1. Open Databricks workspace in browser

  2. Create new notebook:

    • Click "Create" → "Notebook"
    • Name: mount_storage
    • Default Language: Python
    • Cluster: Select your ecommerce-analytics-cluster
  3. Copy this code into the notebook:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Mount Azure Data Lake Storage to Databricks # MAGIC # MAGIC This notebook mounts Azure storage containers to Databricks file system. # MAGIC Run this once to establish connections. # COMMAND ---------- # MAGIC %md # MAGIC ## Configuration # MAGIC # MAGIC Update these values with your actual Azure storage information # COMMAND ---------- # Storage account configuration storage_account_name = "ecommercedata001" # UPDATE THIS container_configs = { "/mnt/bronze": "bronze", "/mnt/silver": "silver", "/mnt/gold": "gold", "/mnt/raw-landing": "raw-landing", "/mnt/logs": "logs" } # COMMAND ---------- # MAGIC %md # MAGIC ## Get Storage Key from Databricks Secrets # MAGIC # MAGIC First, we need to set up secrets scope (one-time setup) # COMMAND ---------- # For now, we'll use direct key (NOT recommended for production) # You should set this up in Databricks secrets scope # TEMPORARY: Enter your storage key here for development storage_account_key = dbutils.secrets.get(scope="azure-storage", key="storage-key") # If secrets aren't set up yet, uncomment and use this: # storage_account_key = "YOUR_STORAGE_KEY_HERE" # COMMAND ---------- # MAGIC %md # MAGIC ## Mount Function # COMMAND ---------- def mount_storage(mount_point, container_name): """ Mount an Azure storage container to Databricks Args: mount_point: DBFS mount point (e.g., /mnt/bronze) container_name: Azure storage container name """ try: # Check if already mounted if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()): print(f"{mount_point} already mounted to {container_name}") return True # Configuration for mounting configs = { f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net": storage_account_key } # Perform mount dbutils.fs.mount( source=f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/", mount_point=mount_point, extra_configs=configs ) print(f"✅ Successfully mounted {container_name} to {mount_point}") return True except Exception as e: print(f"❌ Error mounting {container_name}: {str(e)}") return False # COMMAND ---------- # MAGIC %md # MAGIC ## Mount All Containers # COMMAND ---------- print("=" * 70) print("MOUNTING AZURE STORAGE CONTAINERS") print("=" * 70) success_count = 0 for mount_point, container_name in container_configs.items(): if mount_storage(mount_point, container_name): success_count += 1 print("\n" + "=" * 70) print(f"✅ Successfully mounted {success_count}/{len(container_configs)} containers") print("=" * 70) # COMMAND ---------- # MAGIC %md # MAGIC ## Verify Mounts # COMMAND ---------- # Display all current mounts display(dbutils.fs.mounts()) # COMMAND ---------- # MAGIC %md # MAGIC ## Test Access to raw-landing # COMMAND ---------- print("Testing access to raw-landing container...\n") try: files = dbutils.fs.ls("/mnt/raw-landing") print(f"✅ Found {len(files)} files in /mnt/raw-landing:\n") for file in files: size_mb = file.size / (1024 * 1024) print(f" 📄 {file.name:<25} {size_mb:>8.2f} MB") except Exception as e: print(f"❌ Error accessing raw-landing: {str(e)}") # COMMAND ---------- # MAGIC %md # MAGIC ## Create Bronze Database # COMMAND ---------- # Create database for bronze layer tables spark.sql("CREATE DATABASE IF NOT EXISTS bronze") spark.sql("USE bronze") print("✅ Bronze database created and set as default") # Show current database current_db = spark.sql("SELECT current_database()").collect()[0][0] print(f"Current database: {current_db}")
  1. Update the storage account name in the notebook (cell 3)

  2. Set up storage key:

    • For now, uncomment the line with storage_account_key = "YOUR_STORAGE_KEY_HERE"
    • Replace with your actual key from Phase 2
    • (We'll set up proper secrets in Phase 8)
  3. Run the notebook:

    • Click "Run All" at top
    • Watch each cell execute
    • Should take 1-2 minutes

Expected Output:

Copy to clipboard
====================================================================== MOUNTING AZURE STORAGE CONTAINERS ====================================================================== ✅ /mnt/bronze already mounted to bronze ✅ /mnt/silver already mounted to silver ✅ /mnt/gold already mounted to gold ✅ /mnt/raw-landing already mounted to raw-landing ✅ /mnt/logs already mounted to logs ====================================================================== ✅ Successfully mounted 5/5 containers ======================================================================

✅ CHECKPOINT


STEP 3.2: Ingest Customers Data (1 hour)

Create a notebook to ingest customer data into Bronze layer.

Actions:

  1. Create new notebook:

    • Name: bronze_customers
    • Language: Python
    • Cluster: ecommerce-analytics-cluster
  2. Copy this code:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Bronze Layer: Customer Data Ingestion # MAGIC # MAGIC Ingests raw customer CSV into Delta Lake with: # MAGIC - Explicit schema definition # MAGIC - Data quality validation # MAGIC - Metadata tracking # MAGIC - Partitioning for performance # COMMAND ---------- from pyspark.sql.functions import * from pyspark.sql.types import * from datetime import datetime # COMMAND ---------- # MAGIC %md # MAGIC ## Configuration # COMMAND ---------- SOURCE_PATH = "/mnt/raw-landing/customers.csv" TARGET_PATH = "/mnt/bronze/customers" TABLE_NAME = "bronze.customers" print(f"Source: {SOURCE_PATH}") print(f"Target: {TARGET_PATH}") print(f"Table: {TABLE_NAME}") # COMMAND ---------- # MAGIC %md # MAGIC ## Define Schema # MAGIC # MAGIC Explicit schemas are best practice - never rely on inference in production # COMMAND ---------- customer_schema = StructType([ StructField("customer_id", StringType(), False), StructField("email", StringType(), False), StructField("first_name", StringType(), True), StructField("last_name", StringType(), True), StructField("registration_date", DateType(), False), StructField("country", StringType(), True), StructField("state", StringType(), True), StructField("segment", StringType(), False), StructField("marketing_opt_in", BooleanType(), False), StructField("created_at", TimestampType(), False), StructField("updated_at", TimestampType(), False) ]) print("✅ Schema defined with 11 columns") # COMMAND ---------- # MAGIC %md # MAGIC ## Read Raw CSV # COMMAND ---------- df_raw = spark.read \ .format("csv") \ .option("header", "true") \ .option("dateFormat", "yyyy-MM-dd") \ .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \ .schema(customer_schema) \ .load(SOURCE_PATH) raw_count = df_raw.count() print(f"📊 Read {raw_count:,} raw records from CSV") # COMMAND ---------- # MAGIC %md # MAGIC ## Add Bronze Layer Metadata # COMMAND ---------- df_bronze = df_raw \ .withColumn("ingestion_timestamp", current_timestamp()) \ .withColumn("source_file", input_file_name()) \ .withColumn("bronze_processing_date", current_date()) \ .withColumn("data_source", lit("csv_upload")) print("✅ Added 4 metadata columns") # COMMAND ---------- # MAGIC %md # MAGIC ## Data Quality Checks # COMMAND ---------- print("=" * 70) print("DATA QUALITY CHECKS") print("=" * 70) # Basic counts total_records = df_bronze.count() print(f"\n📊 Total records: {total_records:,}") # Null checks for required fields null_customer_ids = df_bronze.filter(col("customer_id").isNull()).count() null_emails = df_bronze.filter(col("email").isNull()).count() null_segments = df_bronze.filter(col("segment").isNull()).count() print(f"\n🔍 Null Value Checks:") print(f" Null customer_ids: {null_customer_ids}") print(f" Null emails: {null_emails}") print(f" Null segments: {null_segments}") # Duplicate checks duplicate_count = df_bronze.groupBy("customer_id").count() \ .filter(col("count") > 1).count() print(f"\n🔍 Duplicate Check:") print(f" Duplicate customer_ids: {duplicate_count}") # Segment distribution print(f"\n📈 Segment Distribution:") segment_dist = df_bronze.groupBy("segment").count().orderBy(col("count").desc()) segment_dist.show() # Country distribution print(f"\n🌍 Top 5 Countries:") country_dist = df_bronze.groupBy("country").count().orderBy(col("count").desc()).limit(5) country_dist.show() print("=" * 70) # COMMAND ---------- # MAGIC %md # MAGIC ## Display Sample Records # COMMAND ---------- print("Sample of 10 records:") display(df_bronze.limit(10)) # COMMAND ---------- # MAGIC %md # MAGIC ## Write to Delta Lake # MAGIC # MAGIC Partitioned by registration_date for query performance # COMMAND ---------- print("Writing to Delta Lake...") df_bronze.write \ .format("delta") \ .mode("overwrite") \ .option("mergeSchema", "true") \ .option("overwriteSchema", "true") \ .partitionBy("registration_date") \ .save(TARGET_PATH) print(f"✅ Successfully wrote {total_records:,} records to {TARGET_PATH}") # COMMAND ---------- # MAGIC %md # MAGIC ## Create/Register Delta Table # COMMAND ---------- spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME} USING DELTA LOCATION '{TARGET_PATH}' """) print(f"✅ Table {TABLE_NAME} created/registered") # COMMAND ---------- # MAGIC %md # MAGIC ## Verify Table Creation # COMMAND ---------- # Query the table result = spark.sql(f"SELECT COUNT(*) as count FROM {TABLE_NAME}").collect()[0][0] print(f"✅ Verified: {result:,} records in {TABLE_NAME}") # Show sample print("\nSample from table:") spark.sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5").show() # COMMAND ---------- # MAGIC %md # MAGIC ## Table Statistics # COMMAND ---------- # Detailed table information print("Table Details:") spark.sql(f"DESCRIBE DETAIL {TABLE_NAME}").show(truncate=False) # Table schema print("\nTable Schema:") spark.sql(f"DESCRIBE {TABLE_NAME}").show(truncate=False) # COMMAND ---------- # MAGIC %md # MAGIC ## Partition Information # COMMAND ---------- print("Partition Statistics:") spark.sql(f""" SELECT registration_date, COUNT(*) as count FROM {TABLE_NAME} GROUP BY registration_date ORDER BY registration_date DESC LIMIT 10 """).show() # COMMAND ---------- # MAGIC %md # MAGIC ## Success Summary # COMMAND ---------- print("=" * 70) print("✅ CUSTOMER DATA INGESTION COMPLETE") print("=" * 70) print(f"Source: {SOURCE_PATH}") print(f"Target: {TARGET_PATH}") print(f"Table: {TABLE_NAME}") print(f"Records: {total_records:,}") print(f"Partitions: By registration_date") print(f"Format: Delta Lake") print("=" * 70)
  1. Run the notebook (Run All)

Expected Output Summary:

✅ CHECKPOINT


STEP 3.3: Ingest Products Data (45 minutes)

Actions:

  1. Create new notebook:

    • Name: bronze_products
  2. Copy this code:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Bronze Layer: Product Data Ingestion # COMMAND ---------- from pyspark.sql.functions import * from pyspark.sql.types import * # COMMAND ---------- SOURCE_PATH = "/mnt/raw-landing/products.csv" TARGET_PATH = "/mnt/bronze/products" TABLE_NAME = "bronze.products" # COMMAND ---------- # Product schema product_schema = StructType([ StructField("product_id", StringType(), False), StructField("product_name", StringType(), False), StructField("category", StringType(), False), StructField("subcategory", StringType(), True), StructField("price", DecimalType(10,2), False), StructField("cost", DecimalType(10,2), False), StructField("stock_quantity", IntegerType(), False), StructField("supplier", StringType(), True), StructField("created_at", TimestampType(), False), StructField("is_active", BooleanType(), False) ]) # COMMAND ---------- # Read CSV df_raw = spark.read \ .format("csv") \ .option("header", "true") \ .schema(product_schema) \ .load(SOURCE_PATH) print(f"📊 Read {df_raw.count():,} products") # COMMAND ---------- # Add metadata and calculated columns df_bronze = df_raw \ .withColumn("ingestion_timestamp", current_timestamp()) \ .withColumn("source_file", input_file_name()) \ .withColumn("bronze_processing_date", current_date()) \ .withColumn("data_source", lit("csv_upload")) \ .withColumn("profit_margin", round(((col("price") - col("cost")) / col("price") * 100), 2)) # COMMAND ---------- # Quality checks print("=" * 70) print("DATA QUALITY CHECKS") print("=" * 70) total = df_bronze.count() active = df_bronze.filter(col("is_active") == True).count() zero_stock = df_bronze.filter(col("stock_quantity") == 0).count() print(f"\n📊 Total products: {total:,}") print(f" Active: {active:,}") print(f" Zero stock: {zero_stock}") print(f"\n📈 Category Distribution:") df_bronze.groupBy("category").count().orderBy(col("count").desc()).show() print(f"\n💰 Price Statistics:") df_bronze.select( round(min("price"), 2).alias("min_price"), round(avg("price"), 2).alias("avg_price"), round(max("price"), 2).alias("max_price") ).show() # COMMAND ---------- # Display sample display(df_bronze.limit(10)) # COMMAND ---------- # Write to Delta df_bronze.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .save(TARGET_PATH) print(f"✅ Wrote {total:,} records to Delta") # COMMAND ---------- # Create table spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME} USING DELTA LOCATION '{TARGET_PATH}' """) print(f"✅ Table {TABLE_NAME} created") # COMMAND ---------- # Verify result = spark.sql(f"SELECT COUNT(*) as count FROM {TABLE_NAME}").collect()[0][0] print(f"✅ Verified: {result:,} records") spark.sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5").show() # COMMAND ---------- # Category analysis print("\n📊 Category Analysis:") spark.sql(f""" SELECT category, COUNT(*) as product_count, ROUND(AVG(price), 2) as avg_price, ROUND(AVG(profit_margin), 2) as avg_margin, SUM(CASE WHEN is_active THEN 1 ELSE 0 END) as active_count FROM {TABLE_NAME} GROUP BY category ORDER BY product_count DESC """).show() # COMMAND ---------- print("=" * 70) print("✅ PRODUCT DATA INGESTION COMPLETE") print("=" * 70) print(f"Records: {total:,}") print(f"Active Products: {active:,}") print("=" * 70)
  1. Run the notebook

✅ CHECKPOINT


STEP 3.4: Ingest Orders and Order Items (1.5 hours)

Actions:

  1. Create new notebook:

    • Name: bronze_orders
  2. Copy this code:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Bronze Layer: Orders and Order Items Ingestion # MAGIC # MAGIC Ingests both orders and order_items tables # COMMAND ---------- from pyspark.sql.functions import * from pyspark.sql.types import * # COMMAND ---------- # MAGIC %md # MAGIC ## Part 1: Orders Table # COMMAND ---------- SOURCE_PATH_ORDERS = "/mnt/raw-landing/orders.csv" TARGET_PATH_ORDERS = "/mnt/bronze/orders" TABLE_NAME_ORDERS = "bronze.orders" # COMMAND ---------- # Orders schema orders_schema = StructType([ StructField("order_id", StringType(), False), StructField("customer_id", StringType(), False), StructField("order_date", DateType(), False), StructField("order_timestamp", TimestampType(), False), StructField("status", StringType(), False), StructField("payment_method", StringType(), False), StructField("subtotal", DecimalType(10,2), False), StructField("discount", DecimalType(10,2), False), StructField("shipping_cost", DecimalType(10,2), False), StructField("tax", DecimalType(10,2), False), StructField("total", DecimalType(10,2), False), StructField("shipping_address", StringType(), True), StructField("shipping_city", StringType(), True), StructField("shipping_state", StringType(), True), StructField("shipping_country", StringType(), True), StructField("created_at", TimestampType(), False), StructField("updated_at", TimestampType(), False) ]) # COMMAND ---------- # Read orders df_orders_raw = spark.read \ .format("csv") \ .option("header", "true") \ .schema(orders_schema) \ .load(SOURCE_PATH_ORDERS) print(f"📊 Read {df_orders_raw.count():,} orders") # COMMAND ---------- # Add metadata df_orders_bronze = df_orders_raw \ .withColumn("ingestion_timestamp", current_timestamp()) \ .withColumn("source_file", input_file_name()) \ .withColumn("bronze_processing_date", current_date()) \ .withColumn("data_source", lit("csv_upload")) # COMMAND ---------- # Quality checks for orders print("=" * 70) print("ORDERS DATA QUALITY CHECKS") print("=" * 70) total_orders = df_orders_bronze.count() print(f"\n📊 Total orders: {total_orders:,}") print(f"\n📈 Order Status Distribution:") df_orders_bronze.groupBy("status").count().orderBy(col("count").desc()).show() print(f"\n💳 Payment Method Distribution:") df_orders_bronze.groupBy("payment_method").count().orderBy(col("count").desc()).show() print(f"\n💰 Revenue Statistics:") df_orders_bronze.select( round(sum("total"), 2).alias("total_revenue"), round(avg("total"), 2).alias("avg_order_value"), round(min("total"), 2).alias("min_order"), round(max("total"), 2).alias("max_order") ).show() # COMMAND ---------- # Display sample display(df_orders_bronze.limit(10)) # COMMAND ---------- # Write orders to Delta (partitioned by order_date) df_orders_bronze.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .partitionBy("order_date") \ .save(TARGET_PATH_ORDERS) print(f"✅ Wrote {total_orders:,} orders to Delta") # COMMAND ---------- # Create orders table spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME_ORDERS} USING DELTA LOCATION '{TARGET_PATH_ORDERS}' """) print(f"✅ Table {TABLE_NAME_ORDERS} created") # COMMAND ---------- # MAGIC %md # MAGIC ## Part 2: Order Items Table # COMMAND ---------- SOURCE_PATH_ITEMS = "/mnt/raw-landing/order_items.csv" TARGET_PATH_ITEMS = "/mnt/bronze/order_items" TABLE_NAME_ITEMS = "bronze.order_items" # COMMAND ---------- # Order items schema order_items_schema = StructType([ StructField("order_id", StringType(), False), StructField("product_id", StringType(), False), StructField("quantity", IntegerType(), False), StructField("unit_price", DecimalType(10,2), False), StructField("total_price", DecimalType(10,2), False) ]) # COMMAND ---------- # Read order items df_items_raw = spark.read \ .format("csv") \ .option("header", "true") \ .schema(order_items_schema) \ .load(SOURCE_PATH_ITEMS) print(f"📊 Read {df_items_raw.count():,} order items") # COMMAND ---------- # Add metadata df_items_bronze = df_items_raw \ .withColumn("ingestion_timestamp", current_timestamp()) \ .withColumn("source_file", input_file_name()) \ .withColumn("bronze_processing_date", current_date()) \ .withColumn("data_source", lit("csv_upload")) # COMMAND ---------- # Quality checks for order items print("=" * 70) print("ORDER ITEMS DATA QUALITY CHECKS") print("=" * 70) total_items = df_items_bronze.count() print(f"\n📊 Total order items: {total_items:,}") print(f"\n📦 Quantity Statistics:") df_items_bronze.select( round(avg("quantity"), 2).alias("avg_quantity"), max("quantity").alias("max_quantity"), round(sum("quantity"), 0).alias("total_units_sold") ).show() print(f"\n📈 Items per Order:") items_per_order = df_items_bronze.groupBy("order_id").count() items_per_order.select( round(avg("count"), 2).alias("avg_items_per_order"), min("count").alias("min_items"), max("count").alias("max_items") ).show() # COMMAND ---------- # Display sample display(df_items_bronze.limit(10)) # COMMAND ---------- # Write order items to Delta df_items_bronze.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .save(TARGET_PATH_ITEMS) print(f"✅ Wrote {total_items:,} order items to Delta") # COMMAND ---------- # Create order items table spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME_ITEMS} USING DELTA LOCATION '{TARGET_PATH_ITEMS}' """) print(f"✅ Table {TABLE_NAME_ITEMS} created") # COMMAND ---------- # MAGIC %md # MAGIC ## Verification and Analysis # COMMAND ---------- # Verify both tables orders_count = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME_ORDERS}").collect()[0][0] items_count = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME_ITEMS}").collect()[0][0] print(f"{TABLE_NAME_ORDERS}: {orders_count:,} records") print(f"{TABLE_NAME_ITEMS}: {items_count:,} records") # COMMAND ---------- # Join analysis print("\n📊 Order Analysis (with items):") spark.sql(f""" SELECT o.status, COUNT(DISTINCT o.order_id) as order_count, COUNT(oi.product_id) as total_items, ROUND(SUM(o.total), 2) as total_revenue FROM {TABLE_NAME_ORDERS} o LEFT JOIN {TABLE_NAME_ITEMS} oi ON o.order_id = oi.order_id GROUP BY o.status ORDER BY total_revenue DESC """).show() # COMMAND ---------- # Monthly order trends print("\n📊 Monthly Order Trends:") spark.sql(f""" SELECT DATE_FORMAT(order_date, 'yyyy-MM') as month, COUNT(*) as order_count, ROUND(SUM(total), 2) as revenue FROM {TABLE_NAME_ORDERS} GROUP BY DATE_FORMAT(order_date, 'yyyy-MM') ORDER BY month DESC LIMIT 12 """).show() # COMMAND ---------- print("=" * 70) print("✅ ORDERS & ORDER ITEMS INGESTION COMPLETE") print("=" * 70) print(f"Orders: {orders_count:,}") print(f"Order Items: {items_count:,}") print(f"Avg Items per Order: {items_count / orders_count:.2f}") print("=" * 70)
  1. Run the notebook

✅ CHECKPOINT


STEP 3.5: Ingest Web Events Data (1 hour)

Actions:

  1. Create new notebook:

    • Name: bronze_web_events
  2. Copy this code:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Bronze Layer: Web Events Ingestion # COMMAND ---------- from pyspark.sql.functions import * from pyspark.sql.types import * # COMMAND ---------- SOURCE_PATH = "/mnt/raw-landing/web_events.csv" TARGET_PATH = "/mnt/bronze/web_events" TABLE_NAME = "bronze.web_events" # COMMAND ---------- # Web events schema web_events_schema = StructType([ StructField("event_id", StringType(), False), StructField("session_id", StringType(), False), StructField("customer_id", StringType(), True), StructField("event_timestamp", TimestampType(), False), StructField("event_type", StringType(), False), StructField("product_id", StringType(), True), StructField("search_query", StringType(), True), StructField("device_type", StringType(), False), StructField("browser", StringType(), False), StructField("page_url", StringType(), True), StructField("referrer_url", StringType(), True) ]) # COMMAND ---------- # Read web events df_raw = spark.read \ .format("csv") \ .option("header", "true") \ .schema(web_events_schema) \ .load(SOURCE_PATH) print(f"📊 Read {df_raw.count():,} web events") # COMMAND ---------- # Add metadata and derived columns df_bronze = df_raw \ .withColumn("ingestion_timestamp", current_timestamp()) \ .withColumn("source_file", input_file_name()) \ .withColumn("bronze_processing_date", current_date()) \ .withColumn("data_source", lit("csv_upload")) \ .withColumn("event_date", to_date(col("event_timestamp"))) \ .withColumn("event_hour", hour(col("event_timestamp"))) \ .withColumn("is_authenticated", when(col("customer_id").isNotNull(), True).otherwise(False)) # COMMAND ---------- # Quality checks print("=" * 70) print("WEB EVENTS DATA QUALITY CHECKS") print("=" * 70) total_events = df_bronze.count() authenticated = df_bronze.filter(col("is_authenticated") == True).count() anonymous = total_events - authenticated print(f"\n📊 Total events: {total_events:,}") print(f" Authenticated: {authenticated:,} ({authenticated/total_events*100:.1f}%)") print(f" Anonymous: {anonymous:,} ({anonymous/total_events*100:.1f}%)") print(f"\n📈 Event Type Distribution:") df_bronze.groupBy("event_type").count().orderBy(col("count").desc()).show() print(f"\n📱 Device Type Distribution:") df_bronze.groupBy("device_type").count().orderBy(col("count").desc()).show() print(f"\n🌐 Browser Distribution:") df_bronze.groupBy("browser").count().orderBy(col("count").desc()).show() print(f"\n🔍 Events with Product IDs:") with_product = df_bronze.filter(col("product_id").isNotNull()).count() print(f" {with_product:,} events ({with_product/total_events*100:.1f}%)") print(f"\n🔎 Search Events:") search_events = df_bronze.filter(col("event_type") == "search").count() print(f" {search_events:,} search events") # COMMAND ---------- # Display sample display(df_bronze.limit(10)) # COMMAND ---------- # Write to Delta (partitioned by event_date for performance) df_bronze.write \ .format("delta") \ .mode("overwrite") \ .option("overwriteSchema", "true") \ .partitionBy("event_date") \ .save(TARGET_PATH) print(f"✅ Wrote {total_events:,} web events to Delta") # COMMAND ---------- # Create table spark.sql(f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME} USING DELTA LOCATION '{TARGET_PATH}' """) print(f"✅ Table {TABLE_NAME} created") # COMMAND ---------- # Verify result = spark.sql(f"SELECT COUNT(*) as count FROM {TABLE_NAME}").collect()[0][0] print(f"✅ Verified: {result:,} records") # COMMAND ---------- # Hourly activity analysis print("\n📊 Hourly Activity Pattern:") spark.sql(f""" SELECT event_hour, COUNT(*) as event_count FROM {TABLE_NAME} GROUP BY event_hour ORDER BY event_hour """).show(24) # COMMAND ---------- # Daily trends print("\n📊 Daily Event Trends (Last 30 days):") spark.sql(f""" SELECT event_date, COUNT(*) as event_count, COUNT(DISTINCT session_id) as unique_sessions, COUNT(DISTINCT customer_id) as unique_customers FROM {TABLE_NAME} GROUP BY event_date ORDER BY event_date DESC LIMIT 30 """).show() # COMMAND ---------- # Session analysis print("\n📊 Session Analysis:") spark.sql(f""" SELECT COUNT(DISTINCT session_id) as total_sessions, ROUND(COUNT(*) / COUNT(DISTINCT session_id), 2) as avg_events_per_session FROM {TABLE_NAME} """).show() # COMMAND ---------- print("=" * 70) print("✅ WEB EVENTS INGESTION COMPLETE") print("=" * 70) print(f"Events: {total_events:,}") print(f"Date Range: Partitioned by event_date") print(f"Authenticated Users: {authenticated/total_events*100:.1f}%") print("=" * 70)
  1. Run the notebook

Expected Output:

✅ CHECKPOINT


STEP 3.6: Create Bronze Layer Summary Notebook (30 minutes)

Create a notebook to verify all Bronze tables and provide summary statistics.

Actions:

  1. Create new notebook:

    • Name: bronze_summary
  2. Copy this code:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Bronze Layer: Summary and Verification # MAGIC # MAGIC Verifies all Bronze tables and provides comprehensive statistics # COMMAND ---------- from pyspark.sql.functions import * # COMMAND ---------- # MAGIC %md # MAGIC ## List All Bronze Tables # COMMAND ---------- print("=" * 70) print("BRONZE LAYER TABLES") print("=" * 70) tables = spark.sql("SHOW TABLES IN bronze").collect() for table in tables: table_name = f"bronze.{table.tableName}" count = spark.sql(f"SELECT COUNT(*) FROM {table_name}").collect()[0][0] print(f"{table_name:<30} {count:>12,} records") print("=" * 70) # COMMAND ---------- # MAGIC %md # MAGIC ## Table Details # COMMAND ---------- # MAGIC %md # MAGIC ### Customers Table # COMMAND ---------- print("📊 CUSTOMERS TABLE SUMMARY") print("-" * 70) spark.sql(""" SELECT COUNT(*) as total_customers, COUNT(DISTINCT country) as countries, COUNT(DISTINCT segment) as segments, MIN(registration_date) as earliest_registration, MAX(registration_date) as latest_registration FROM bronze.customers """).show() print("\nSegment Distribution:") spark.sql(""" SELECT segment, COUNT(*) as count FROM bronze.customers GROUP BY segment ORDER BY count DESC """).show() print("\nTop Countries:") spark.sql(""" SELECT country, COUNT(*) as count FROM bronze.customers GROUP BY country ORDER BY count DESC LIMIT 5 """).show() # COMMAND ---------- # MAGIC %md # MAGIC ### Products Table # COMMAND ---------- print("📦 PRODUCTS TABLE SUMMARY") print("-" * 70) spark.sql(""" SELECT COUNT(*) as total_products, SUM(CASE WHEN is_active THEN 1 ELSE 0 END) as active_products, COUNT(DISTINCT category) as categories, ROUND(AVG(price), 2) as avg_price, ROUND(AVG(profit_margin), 2) as avg_margin FROM bronze.products """).show() print("\nCategory Breakdown:") spark.sql(""" SELECT category, COUNT(*) as product_count, ROUND(AVG(price), 2) as avg_price, ROUND(AVG(profit_margin), 2) as avg_margin FROM bronze.products GROUP BY category ORDER BY product_count DESC """).show() # COMMAND ---------- # MAGIC %md # MAGIC ### Orders Table # COMMAND ---------- print("🛒 ORDERS TABLE SUMMARY") print("-" * 70) spark.sql(""" SELECT COUNT(*) as total_orders, ROUND(SUM(total), 2) as total_revenue, ROUND(AVG(total), 2) as avg_order_value, MIN(order_date) as earliest_order, MAX(order_date) as latest_order FROM bronze.orders """).show() print("\nOrder Status:") spark.sql(""" SELECT status, COUNT(*) as count, ROUND(SUM(total), 2) as revenue FROM bronze.orders GROUP BY status ORDER BY revenue DESC """).show() print("\nPayment Methods:") spark.sql(""" SELECT payment_method, COUNT(*) as count, ROUND(AVG(total), 2) as avg_value FROM bronze.orders GROUP BY payment_method ORDER BY count DESC """).show() # COMMAND ---------- # MAGIC %md # MAGIC ### Order Items Table # COMMAND ---------- print("📋 ORDER ITEMS TABLE SUMMARY") print("-" * 70) spark.sql(""" SELECT COUNT(*) as total_items, COUNT(DISTINCT order_id) as distinct_orders, COUNT(DISTINCT product_id) as distinct_products, ROUND(COUNT(*) * 1.0 / COUNT(DISTINCT order_id), 2) as avg_items_per_order, SUM(quantity) as total_units_sold FROM bronze.order_items """).show() print("\nTop 10 Products by Quantity Sold:") spark.sql(""" SELECT product_id, SUM(quantity) as total_quantity, COUNT(DISTINCT order_id) as times_ordered, ROUND(SUM(total_price), 2) as total_revenue FROM bronze.order_items GROUP BY product_id ORDER BY total_quantity DESC LIMIT 10 """).show() # COMMAND ---------- # MAGIC %md # MAGIC ### Web Events Table # COMMAND ---------- print("🌐 WEB EVENTS TABLE SUMMARY") print("-" * 70) spark.sql(""" SELECT COUNT(*) as total_events, COUNT(DISTINCT session_id) as unique_sessions, COUNT(DISTINCT customer_id) as unique_customers, ROUND(COUNT(*) * 1.0 / COUNT(DISTINCT session_id), 2) as avg_events_per_session FROM bronze.web_events """).show() print("\nEvent Type Distribution:") spark.sql(""" SELECT event_type, COUNT(*) as count, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage FROM bronze.web_events GROUP BY event_type ORDER BY count DESC """).show() print("\nDevice and Browser:") spark.sql(""" SELECT device_type, COUNT(*) as count FROM bronze.web_events GROUP BY device_type ORDER BY count DESC """).show() # COMMAND ---------- # MAGIC %md # MAGIC ## Data Quality Summary # COMMAND ---------- print("=" * 70) print("DATA QUALITY SUMMARY") print("=" * 70) # Check for nulls in key fields print("\n🔍 Null Value Checks:") tables_to_check = [ ("bronze.customers", "customer_id"), ("bronze.products", "product_id"), ("bronze.orders", "order_id"), ("bronze.order_items", "order_id"), ("bronze.web_events", "event_id") ] for table, key_col in tables_to_check: null_count = spark.sql(f""" SELECT COUNT(*) FROM {table} WHERE {key_col} IS NULL """).collect()[0][0] total = spark.sql(f"SELECT COUNT(*) FROM {table}").collect()[0][0] status = "✅" if null_count == 0 else "❌" print(f"{status} {table:<25} Null {key_col}: {null_count}/{total}") # COMMAND ---------- # MAGIC %md # MAGIC ## Referential Integrity Checks # COMMAND ---------- print("\n🔗 Referential Integrity Checks:") # Orders -> Customers orphan_orders = spark.sql(""" SELECT COUNT(*) FROM bronze.orders o LEFT JOIN bronze.customers c ON o.customer_id = c.customer_id WHERE c.customer_id IS NULL """).collect()[0][0] print(f"{'✅' if orphan_orders == 0 else '❌'} Orders with invalid customer_id: {orphan_orders}") # Order Items -> Orders orphan_items = spark.sql(""" SELECT COUNT(*) FROM bronze.order_items oi LEFT JOIN bronze.orders o ON oi.order_id = o.order_id WHERE o.order_id IS NULL """).collect()[0][0] print(f"{'✅' if orphan_items == 0 else '❌'} Order items with invalid order_id: {orphan_items}") # Order Items -> Products orphan_items_products = spark.sql(""" SELECT COUNT(*) FROM bronze.order_items oi LEFT JOIN bronze.products p ON oi.product_id = p.product_id WHERE p.product_id IS NULL """).collect()[0][0] print(f"{'✅' if orphan_items_products == 0 else '❌'} Order items with invalid product_id: {orphan_items_products}") # Web Events -> Customers (null customer_id is allowed for anonymous) authenticated_events = spark.sql(""" SELECT COUNT(*) FROM bronze.web_events WHERE customer_id IS NOT NULL """).collect()[0][0] invalid_customer_events = spark.sql(""" SELECT COUNT(*) FROM bronze.web_events w LEFT JOIN bronze.customers c ON w.customer_id = c.customer_id WHERE w.customer_id IS NOT NULL AND c.customer_id IS NULL """).collect()[0][0] print(f"{'✅' if invalid_customer_events == 0 else '❌'} Authenticated web events with invalid customer_id: {invalid_customer_events}/{authenticated_events}") # COMMAND ---------- # MAGIC %md # MAGIC ## Storage Statistics # COMMAND ---------- print("\n💾 Storage Statistics:") tables = ["customers", "products", "orders", "order_items", "web_events"] for table in tables: details = spark.sql(f"DESCRIBE DETAIL bronze.{table}").collect()[0] size_mb = details.sizeInBytes / (1024 * 1024) num_files = details.numFiles print(f"bronze.{table:<15} Size: {size_mb:>8.2f} MB, Files: {num_files}") # COMMAND ---------- # MAGIC %md # MAGIC ## Bronze Layer Complete # COMMAND ---------- print("\n" + "=" * 70) print("✅ BRONZE LAYER VERIFICATION COMPLETE") print("=" * 70) print("\nAll Tables:") print(" • bronze.customers - 10,000 records") print(" • bronze.products - 500 records") print(" • bronze.orders - 50,000 records") print(" • bronze.order_items - 100,000+ records") print(" • bronze.web_events - 200,000 records") print("\nTotal Records: 360,500+") print("Storage Format: Delta Lake") print("Partitioning: Applied where appropriate") print("Data Quality: All checks passed") print("=" * 70)
  1. Run the notebook

✅ CHECKPOINT


STEP 3.7: Commit Phase 3 Changes (15 minutes)

Actions:

  1. Download all notebooks from Databricks:

    • In Databricks, go to Workspace
    • Right-click your notebook folder
    • Export → DBC Archive
    • Save to your project directory
  2. Organize locally:

Copy to clipboard
# Create directory for notebook exports mkdir -p databricks/notebook_exports # Move downloaded DBC file there mv ~/Downloads/notebooks.dbc databricks/notebook_exports/bronze_layer_notebooks.dbc
  1. Document what you built:

Create docs/bronze_layer_guide.md:

Copy to clipboard
# Bronze Layer Documentation ## Overview The Bronze layer contains raw data ingested from CSV files into Delta Lake format with minimal transformation. ## Tables Created ### 1. bronze.customers - **Records:** 10,000 - **Partitioning:** registration_date - **Key Columns:** customer_id (PK), email, segment, country - **Purpose:** Customer master data ### 2. bronze.products - **Records:** 500 - **Partitioning:** None - **Key Columns:** product_id (PK), category, price, is_active - **Purpose:** Product catalog ### 3. bronze.orders - **Records:** 50,000 - **Partitioning:** order_date - **Key Columns:** order_id (PK), customer_id (FK), total, status - **Purpose:** Order transactions ### 4. bronze.order_items - **Records:** 100,000+ - **Partitioning:** None - **Key Columns:** order_id (FK), product_id (FK), quantity - **Purpose:** Order line items ### 5. bronze.web_events - **Records:** 200,000 - **Partitioning:** event_date - **Key Columns:** event_id (PK), session_id, event_type - **Purpose:** Website interaction tracking ## Metadata Columns All Bronze tables include: - `ingestion_timestamp` - When record was loaded - `source_file` - Original file name - `bronze_processing_date` - Processing date - `data_source` - Source system identifier ## Data Quality - Zero null values in primary keys - All foreign key relationships valid - No duplicate records - Data types validated against schema
  1. Commit to Git:
Copy to clipboard
git status git add docs/bronze_layer_guide.md git add databricks/notebook_exports/ git commit -m "Phase 3 complete: Bronze layer implementation - Created 5 Bronze Delta tables with 360,500+ records - Implemented explicit schemas for all tables - Added metadata tracking columns - Configured partitioning for performance - All data quality checks passed - Referential integrity verified" git push origin main

✅ CHECKPOINT


PHASE 3 COMPLETE! 🎉

What You Built:

✅ Data Ingestion Infrastructure

✅ Bronze Layer Tables (360,500+ records)

✅ Performance Optimizations

✅ Data Quality

✅ Best Practices


Cost Check

Phase 3 Costs:


What's Next: Phase 4

In Phase 4, you will:

Estimated Time: 6-8 hours over Days 9-11


Troubleshooting

Issue: Mount fails with "403 Forbidden"
Solution: Verify storage key is correct, check container permissions

Issue: CSV read fails with schema mismatch
Solution: Check date/timestamp formats match schema definition

Issue: Table already exists error
Solution: Use CREATE TABLE IF NOT EXISTS or drop table first

Issue: Partitioning creates too many small files
Solution: Reduce partition granularity or use Z-ordering instead

Issue: Out of memory when reading large files
Solution: Increase cluster size or read in chunks


Key Learnings

  1. Explicit schemas prevent data type issues - Always define schemas upfront
  2. Partitioning improves query performance - Partition on frequently filtered columns
  3. Metadata columns enable tracking - Track source, timing, and lineage
  4. Delta Lake provides reliability - ACID transactions prevent data corruption
  5. Data quality checks catch issues early - Validate at ingestion time

Phase 3 Manual Version 1.0
Last Updated: 2025-01-01