Duration: Days 6-8 | 6-8 hours total
Goal: Ingest raw CSV data into Delta Lake tables with proper schema and validation
In Phase 3, you will:
Bronze Layer Philosophy: Store raw data exactly as received with minimal transformation, add metadata for tracking, preserve data lineage.
Before starting Phase 3:
Raw CSV Files (raw-landing)
↓
Bronze Notebooks
↓
Delta Lake Tables (bronze)
↓
[customers]
[products]
[orders]
[order_items]
[web_events]
You'll create a Databricks notebook to mount Azure storage containers.
Open Databricks workspace in browser
Create new notebook:
mount_storageCopy this code into the notebook:
# Databricks notebook source
# MAGIC %md
# MAGIC # Mount Azure Data Lake Storage to Databricks
# MAGIC
# MAGIC This notebook mounts Azure storage containers to Databricks file system.
# MAGIC Run this once to establish connections.
# COMMAND ----------
# MAGIC %md
# MAGIC ## Configuration
# MAGIC
# MAGIC Update these values with your actual Azure storage information
# COMMAND ----------
# Storage account configuration
storage_account_name = "ecommercedata001" # UPDATE THIS
container_configs = {
"/mnt/bronze": "bronze",
"/mnt/silver": "silver",
"/mnt/gold": "gold",
"/mnt/raw-landing": "raw-landing",
"/mnt/logs": "logs"
}
# COMMAND ----------
# MAGIC %md
# MAGIC ## Get Storage Key from Databricks Secrets
# MAGIC
# MAGIC First, we need to set up secrets scope (one-time setup)
# COMMAND ----------
# For now, we'll use direct key (NOT recommended for production)
# You should set this up in Databricks secrets scope
# TEMPORARY: Enter your storage key here for development
storage_account_key = dbutils.secrets.get(scope="azure-storage", key="storage-key")
# If secrets aren't set up yet, uncomment and use this:
# storage_account_key = "YOUR_STORAGE_KEY_HERE"
# COMMAND ----------
# MAGIC %md
# MAGIC ## Mount Function
# COMMAND ----------
def mount_storage(mount_point, container_name):
"""
Mount an Azure storage container to Databricks
Args:
mount_point: DBFS mount point (e.g., /mnt/bronze)
container_name: Azure storage container name
"""
try:
# Check if already mounted
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
print(f"✅ {mount_point} already mounted to {container_name}")
return True
# Configuration for mounting
configs = {
f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net": storage_account_key
}
# Perform mount
dbutils.fs.mount(
source=f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/",
mount_point=mount_point,
extra_configs=configs
)
print(f"✅ Successfully mounted {container_name} to {mount_point}")
return True
except Exception as e:
print(f"❌ Error mounting {container_name}: {str(e)}")
return False
# COMMAND ----------
# MAGIC %md
# MAGIC ## Mount All Containers
# COMMAND ----------
print("=" * 70)
print("MOUNTING AZURE STORAGE CONTAINERS")
print("=" * 70)
success_count = 0
for mount_point, container_name in container_configs.items():
if mount_storage(mount_point, container_name):
success_count += 1
print("\n" + "=" * 70)
print(f"✅ Successfully mounted {success_count}/{len(container_configs)} containers")
print("=" * 70)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Verify Mounts
# COMMAND ----------
# Display all current mounts
display(dbutils.fs.mounts())
# COMMAND ----------
# MAGIC %md
# MAGIC ## Test Access to raw-landing
# COMMAND ----------
print("Testing access to raw-landing container...\n")
try:
files = dbutils.fs.ls("/mnt/raw-landing")
print(f"✅ Found {len(files)} files in /mnt/raw-landing:\n")
for file in files:
size_mb = file.size / (1024 * 1024)
print(f" 📄 {file.name:<25} {size_mb:>8.2f} MB")
except Exception as e:
print(f"❌ Error accessing raw-landing: {str(e)}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Create Bronze Database
# COMMAND ----------
# Create database for bronze layer tables
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")
spark.sql("USE bronze")
print("✅ Bronze database created and set as default")
# Show current database
current_db = spark.sql("SELECT current_database()").collect()[0][0]
print(f"Current database: {current_db}")
Update the storage account name in the notebook (cell 3)
Set up storage key:
storage_account_key = "YOUR_STORAGE_KEY_HERE"Run the notebook:
======================================================================
MOUNTING AZURE STORAGE CONTAINERS
======================================================================
✅ /mnt/bronze already mounted to bronze
✅ /mnt/silver already mounted to silver
✅ /mnt/gold already mounted to gold
✅ /mnt/raw-landing already mounted to raw-landing
✅ /mnt/logs already mounted to logs
======================================================================
✅ Successfully mounted 5/5 containers
======================================================================
Create a notebook to ingest customer data into Bronze layer.
Create new notebook:
bronze_customersCopy this code:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Layer: Customer Data Ingestion
# MAGIC
# MAGIC Ingests raw customer CSV into Delta Lake with:
# MAGIC - Explicit schema definition
# MAGIC - Data quality validation
# MAGIC - Metadata tracking
# MAGIC - Partitioning for performance
# COMMAND ----------
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
# COMMAND ----------
# MAGIC %md
# MAGIC ## Configuration
# COMMAND ----------
SOURCE_PATH = "/mnt/raw-landing/customers.csv"
TARGET_PATH = "/mnt/bronze/customers"
TABLE_NAME = "bronze.customers"
print(f"Source: {SOURCE_PATH}")
print(f"Target: {TARGET_PATH}")
print(f"Table: {TABLE_NAME}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Define Schema
# MAGIC
# MAGIC Explicit schemas are best practice - never rely on inference in production
# COMMAND ----------
customer_schema = StructType([
StructField("customer_id", StringType(), False),
StructField("email", StringType(), False),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("registration_date", DateType(), False),
StructField("country", StringType(), True),
StructField("state", StringType(), True),
StructField("segment", StringType(), False),
StructField("marketing_opt_in", BooleanType(), False),
StructField("created_at", TimestampType(), False),
StructField("updated_at", TimestampType(), False)
])
print("✅ Schema defined with 11 columns")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Read Raw CSV
# COMMAND ----------
df_raw = spark.read \
.format("csv") \
.option("header", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
.schema(customer_schema) \
.load(SOURCE_PATH)
raw_count = df_raw.count()
print(f"📊 Read {raw_count:,} raw records from CSV")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Add Bronze Layer Metadata
# COMMAND ----------
df_bronze = df_raw \
.withColumn("ingestion_timestamp", current_timestamp()) \
.withColumn("source_file", input_file_name()) \
.withColumn("bronze_processing_date", current_date()) \
.withColumn("data_source", lit("csv_upload"))
print("✅ Added 4 metadata columns")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Data Quality Checks
# COMMAND ----------
print("=" * 70)
print("DATA QUALITY CHECKS")
print("=" * 70)
# Basic counts
total_records = df_bronze.count()
print(f"\n📊 Total records: {total_records:,}")
# Null checks for required fields
null_customer_ids = df_bronze.filter(col("customer_id").isNull()).count()
null_emails = df_bronze.filter(col("email").isNull()).count()
null_segments = df_bronze.filter(col("segment").isNull()).count()
print(f"\n🔍 Null Value Checks:")
print(f" Null customer_ids: {null_customer_ids}")
print(f" Null emails: {null_emails}")
print(f" Null segments: {null_segments}")
# Duplicate checks
duplicate_count = df_bronze.groupBy("customer_id").count() \
.filter(col("count") > 1).count()
print(f"\n🔍 Duplicate Check:")
print(f" Duplicate customer_ids: {duplicate_count}")
# Segment distribution
print(f"\n📈 Segment Distribution:")
segment_dist = df_bronze.groupBy("segment").count().orderBy(col("count").desc())
segment_dist.show()
# Country distribution
print(f"\n🌍 Top 5 Countries:")
country_dist = df_bronze.groupBy("country").count().orderBy(col("count").desc()).limit(5)
country_dist.show()
print("=" * 70)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Display Sample Records
# COMMAND ----------
print("Sample of 10 records:")
display(df_bronze.limit(10))
# COMMAND ----------
# MAGIC %md
# MAGIC ## Write to Delta Lake
# MAGIC
# MAGIC Partitioned by registration_date for query performance
# COMMAND ----------
print("Writing to Delta Lake...")
df_bronze.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", "true") \
.option("overwriteSchema", "true") \
.partitionBy("registration_date") \
.save(TARGET_PATH)
print(f"✅ Successfully wrote {total_records:,} records to {TARGET_PATH}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Create/Register Delta Table
# COMMAND ----------
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME}
USING DELTA
LOCATION '{TARGET_PATH}'
""")
print(f"✅ Table {TABLE_NAME} created/registered")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Verify Table Creation
# COMMAND ----------
# Query the table
result = spark.sql(f"SELECT COUNT(*) as count FROM {TABLE_NAME}").collect()[0][0]
print(f"✅ Verified: {result:,} records in {TABLE_NAME}")
# Show sample
print("\nSample from table:")
spark.sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ## Table Statistics
# COMMAND ----------
# Detailed table information
print("Table Details:")
spark.sql(f"DESCRIBE DETAIL {TABLE_NAME}").show(truncate=False)
# Table schema
print("\nTable Schema:")
spark.sql(f"DESCRIBE {TABLE_NAME}").show(truncate=False)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Partition Information
# COMMAND ----------
print("Partition Statistics:")
spark.sql(f"""
SELECT registration_date, COUNT(*) as count
FROM {TABLE_NAME}
GROUP BY registration_date
ORDER BY registration_date DESC
LIMIT 10
""").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ## Success Summary
# COMMAND ----------
print("=" * 70)
print("✅ CUSTOMER DATA INGESTION COMPLETE")
print("=" * 70)
print(f"Source: {SOURCE_PATH}")
print(f"Target: {TARGET_PATH}")
print(f"Table: {TABLE_NAME}")
print(f"Records: {total_records:,}")
print(f"Partitions: By registration_date")
print(f"Format: Delta Lake")
print("=" * 70)
Create new notebook:
bronze_productsCopy this code:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Layer: Product Data Ingestion
# COMMAND ----------
from pyspark.sql.functions import *
from pyspark.sql.types import *
# COMMAND ----------
SOURCE_PATH = "/mnt/raw-landing/products.csv"
TARGET_PATH = "/mnt/bronze/products"
TABLE_NAME = "bronze.products"
# COMMAND ----------
# Product schema
product_schema = StructType([
StructField("product_id", StringType(), False),
StructField("product_name", StringType(), False),
StructField("category", StringType(), False),
StructField("subcategory", StringType(), True),
StructField("price", DecimalType(10,2), False),
StructField("cost", DecimalType(10,2), False),
StructField("stock_quantity", IntegerType(), False),
StructField("supplier", StringType(), True),
StructField("created_at", TimestampType(), False),
StructField("is_active", BooleanType(), False)
])
# COMMAND ----------
# Read CSV
df_raw = spark.read \
.format("csv") \
.option("header", "true") \
.schema(product_schema) \
.load(SOURCE_PATH)
print(f"📊 Read {df_raw.count():,} products")
# COMMAND ----------
# Add metadata and calculated columns
df_bronze = df_raw \
.withColumn("ingestion_timestamp", current_timestamp()) \
.withColumn("source_file", input_file_name()) \
.withColumn("bronze_processing_date", current_date()) \
.withColumn("data_source", lit("csv_upload")) \
.withColumn("profit_margin",
round(((col("price") - col("cost")) / col("price") * 100), 2))
# COMMAND ----------
# Quality checks
print("=" * 70)
print("DATA QUALITY CHECKS")
print("=" * 70)
total = df_bronze.count()
active = df_bronze.filter(col("is_active") == True).count()
zero_stock = df_bronze.filter(col("stock_quantity") == 0).count()
print(f"\n📊 Total products: {total:,}")
print(f" Active: {active:,}")
print(f" Zero stock: {zero_stock}")
print(f"\n📈 Category Distribution:")
df_bronze.groupBy("category").count().orderBy(col("count").desc()).show()
print(f"\n💰 Price Statistics:")
df_bronze.select(
round(min("price"), 2).alias("min_price"),
round(avg("price"), 2).alias("avg_price"),
round(max("price"), 2).alias("max_price")
).show()
# COMMAND ----------
# Display sample
display(df_bronze.limit(10))
# COMMAND ----------
# Write to Delta
df_bronze.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save(TARGET_PATH)
print(f"✅ Wrote {total:,} records to Delta")
# COMMAND ----------
# Create table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME}
USING DELTA
LOCATION '{TARGET_PATH}'
""")
print(f"✅ Table {TABLE_NAME} created")
# COMMAND ----------
# Verify
result = spark.sql(f"SELECT COUNT(*) as count FROM {TABLE_NAME}").collect()[0][0]
print(f"✅ Verified: {result:,} records")
spark.sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5").show()
# COMMAND ----------
# Category analysis
print("\n📊 Category Analysis:")
spark.sql(f"""
SELECT
category,
COUNT(*) as product_count,
ROUND(AVG(price), 2) as avg_price,
ROUND(AVG(profit_margin), 2) as avg_margin,
SUM(CASE WHEN is_active THEN 1 ELSE 0 END) as active_count
FROM {TABLE_NAME}
GROUP BY category
ORDER BY product_count DESC
""").show()
# COMMAND ----------
print("=" * 70)
print("✅ PRODUCT DATA INGESTION COMPLETE")
print("=" * 70)
print(f"Records: {total:,}")
print(f"Active Products: {active:,}")
print("=" * 70)
Create new notebook:
bronze_ordersCopy this code:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Layer: Orders and Order Items Ingestion
# MAGIC
# MAGIC Ingests both orders and order_items tables
# COMMAND ----------
from pyspark.sql.functions import *
from pyspark.sql.types import *
# COMMAND ----------
# MAGIC %md
# MAGIC ## Part 1: Orders Table
# COMMAND ----------
SOURCE_PATH_ORDERS = "/mnt/raw-landing/orders.csv"
TARGET_PATH_ORDERS = "/mnt/bronze/orders"
TABLE_NAME_ORDERS = "bronze.orders"
# COMMAND ----------
# Orders schema
orders_schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("order_date", DateType(), False),
StructField("order_timestamp", TimestampType(), False),
StructField("status", StringType(), False),
StructField("payment_method", StringType(), False),
StructField("subtotal", DecimalType(10,2), False),
StructField("discount", DecimalType(10,2), False),
StructField("shipping_cost", DecimalType(10,2), False),
StructField("tax", DecimalType(10,2), False),
StructField("total", DecimalType(10,2), False),
StructField("shipping_address", StringType(), True),
StructField("shipping_city", StringType(), True),
StructField("shipping_state", StringType(), True),
StructField("shipping_country", StringType(), True),
StructField("created_at", TimestampType(), False),
StructField("updated_at", TimestampType(), False)
])
# COMMAND ----------
# Read orders
df_orders_raw = spark.read \
.format("csv") \
.option("header", "true") \
.schema(orders_schema) \
.load(SOURCE_PATH_ORDERS)
print(f"📊 Read {df_orders_raw.count():,} orders")
# COMMAND ----------
# Add metadata
df_orders_bronze = df_orders_raw \
.withColumn("ingestion_timestamp", current_timestamp()) \
.withColumn("source_file", input_file_name()) \
.withColumn("bronze_processing_date", current_date()) \
.withColumn("data_source", lit("csv_upload"))
# COMMAND ----------
# Quality checks for orders
print("=" * 70)
print("ORDERS DATA QUALITY CHECKS")
print("=" * 70)
total_orders = df_orders_bronze.count()
print(f"\n📊 Total orders: {total_orders:,}")
print(f"\n📈 Order Status Distribution:")
df_orders_bronze.groupBy("status").count().orderBy(col("count").desc()).show()
print(f"\n💳 Payment Method Distribution:")
df_orders_bronze.groupBy("payment_method").count().orderBy(col("count").desc()).show()
print(f"\n💰 Revenue Statistics:")
df_orders_bronze.select(
round(sum("total"), 2).alias("total_revenue"),
round(avg("total"), 2).alias("avg_order_value"),
round(min("total"), 2).alias("min_order"),
round(max("total"), 2).alias("max_order")
).show()
# COMMAND ----------
# Display sample
display(df_orders_bronze.limit(10))
# COMMAND ----------
# Write orders to Delta (partitioned by order_date)
df_orders_bronze.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("order_date") \
.save(TARGET_PATH_ORDERS)
print(f"✅ Wrote {total_orders:,} orders to Delta")
# COMMAND ----------
# Create orders table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME_ORDERS}
USING DELTA
LOCATION '{TARGET_PATH_ORDERS}'
""")
print(f"✅ Table {TABLE_NAME_ORDERS} created")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Part 2: Order Items Table
# COMMAND ----------
SOURCE_PATH_ITEMS = "/mnt/raw-landing/order_items.csv"
TARGET_PATH_ITEMS = "/mnt/bronze/order_items"
TABLE_NAME_ITEMS = "bronze.order_items"
# COMMAND ----------
# Order items schema
order_items_schema = StructType([
StructField("order_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("quantity", IntegerType(), False),
StructField("unit_price", DecimalType(10,2), False),
StructField("total_price", DecimalType(10,2), False)
])
# COMMAND ----------
# Read order items
df_items_raw = spark.read \
.format("csv") \
.option("header", "true") \
.schema(order_items_schema) \
.load(SOURCE_PATH_ITEMS)
print(f"📊 Read {df_items_raw.count():,} order items")
# COMMAND ----------
# Add metadata
df_items_bronze = df_items_raw \
.withColumn("ingestion_timestamp", current_timestamp()) \
.withColumn("source_file", input_file_name()) \
.withColumn("bronze_processing_date", current_date()) \
.withColumn("data_source", lit("csv_upload"))
# COMMAND ----------
# Quality checks for order items
print("=" * 70)
print("ORDER ITEMS DATA QUALITY CHECKS")
print("=" * 70)
total_items = df_items_bronze.count()
print(f"\n📊 Total order items: {total_items:,}")
print(f"\n📦 Quantity Statistics:")
df_items_bronze.select(
round(avg("quantity"), 2).alias("avg_quantity"),
max("quantity").alias("max_quantity"),
round(sum("quantity"), 0).alias("total_units_sold")
).show()
print(f"\n📈 Items per Order:")
items_per_order = df_items_bronze.groupBy("order_id").count()
items_per_order.select(
round(avg("count"), 2).alias("avg_items_per_order"),
min("count").alias("min_items"),
max("count").alias("max_items")
).show()
# COMMAND ----------
# Display sample
display(df_items_bronze.limit(10))
# COMMAND ----------
# Write order items to Delta
df_items_bronze.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save(TARGET_PATH_ITEMS)
print(f"✅ Wrote {total_items:,} order items to Delta")
# COMMAND ----------
# Create order items table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME_ITEMS}
USING DELTA
LOCATION '{TARGET_PATH_ITEMS}'
""")
print(f"✅ Table {TABLE_NAME_ITEMS} created")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Verification and Analysis
# COMMAND ----------
# Verify both tables
orders_count = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME_ORDERS}").collect()[0][0]
items_count = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME_ITEMS}").collect()[0][0]
print(f"✅ {TABLE_NAME_ORDERS}: {orders_count:,} records")
print(f"✅ {TABLE_NAME_ITEMS}: {items_count:,} records")
# COMMAND ----------
# Join analysis
print("\n📊 Order Analysis (with items):")
spark.sql(f"""
SELECT
o.status,
COUNT(DISTINCT o.order_id) as order_count,
COUNT(oi.product_id) as total_items,
ROUND(SUM(o.total), 2) as total_revenue
FROM {TABLE_NAME_ORDERS} o
LEFT JOIN {TABLE_NAME_ITEMS} oi ON o.order_id = oi.order_id
GROUP BY o.status
ORDER BY total_revenue DESC
""").show()
# COMMAND ----------
# Monthly order trends
print("\n📊 Monthly Order Trends:")
spark.sql(f"""
SELECT
DATE_FORMAT(order_date, 'yyyy-MM') as month,
COUNT(*) as order_count,
ROUND(SUM(total), 2) as revenue
FROM {TABLE_NAME_ORDERS}
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM')
ORDER BY month DESC
LIMIT 12
""").show()
# COMMAND ----------
print("=" * 70)
print("✅ ORDERS & ORDER ITEMS INGESTION COMPLETE")
print("=" * 70)
print(f"Orders: {orders_count:,}")
print(f"Order Items: {items_count:,}")
print(f"Avg Items per Order: {items_count / orders_count:.2f}")
print("=" * 70)
Actions:
Create new notebook:
bronze_web_eventsCopy this code:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Layer: Web Events Ingestion
# COMMAND ----------
from pyspark.sql.functions import *
from pyspark.sql.types import *
# COMMAND ----------
SOURCE_PATH = "/mnt/raw-landing/web_events.csv"
TARGET_PATH = "/mnt/bronze/web_events"
TABLE_NAME = "bronze.web_events"
# COMMAND ----------
# Web events schema
web_events_schema = StructType([
StructField("event_id", StringType(), False),
StructField("session_id", StringType(), False),
StructField("customer_id", StringType(), True),
StructField("event_timestamp", TimestampType(), False),
StructField("event_type", StringType(), False),
StructField("product_id", StringType(), True),
StructField("search_query", StringType(), True),
StructField("device_type", StringType(), False),
StructField("browser", StringType(), False),
StructField("page_url", StringType(), True),
StructField("referrer_url", StringType(), True)
])
# COMMAND ----------
# Read web events
df_raw = spark.read \
.format("csv") \
.option("header", "true") \
.schema(web_events_schema) \
.load(SOURCE_PATH)
print(f"📊 Read {df_raw.count():,} web events")
# COMMAND ----------
# Add metadata and derived columns
df_bronze = df_raw \
.withColumn("ingestion_timestamp", current_timestamp()) \
.withColumn("source_file", input_file_name()) \
.withColumn("bronze_processing_date", current_date()) \
.withColumn("data_source", lit("csv_upload")) \
.withColumn("event_date", to_date(col("event_timestamp"))) \
.withColumn("event_hour", hour(col("event_timestamp"))) \
.withColumn("is_authenticated", when(col("customer_id").isNotNull(), True).otherwise(False))
# COMMAND ----------
# Quality checks
print("=" * 70)
print("WEB EVENTS DATA QUALITY CHECKS")
print("=" * 70)
total_events = df_bronze.count()
authenticated = df_bronze.filter(col("is_authenticated") == True).count()
anonymous = total_events - authenticated
print(f"\n📊 Total events: {total_events:,}")
print(f" Authenticated: {authenticated:,} ({authenticated/total_events*100:.1f}%)")
print(f" Anonymous: {anonymous:,} ({anonymous/total_events*100:.1f}%)")
print(f"\n📈 Event Type Distribution:")
df_bronze.groupBy("event_type").count().orderBy(col("count").desc()).show()
print(f"\n📱 Device Type Distribution:")
df_bronze.groupBy("device_type").count().orderBy(col("count").desc()).show()
print(f"\n🌐 Browser Distribution:")
df_bronze.groupBy("browser").count().orderBy(col("count").desc()).show()
print(f"\n🔍 Events with Product IDs:")
with_product = df_bronze.filter(col("product_id").isNotNull()).count()
print(f" {with_product:,} events ({with_product/total_events*100:.1f}%)")
print(f"\n🔎 Search Events:")
search_events = df_bronze.filter(col("event_type") == "search").count()
print(f" {search_events:,} search events")
# COMMAND ----------
# Display sample
display(df_bronze.limit(10))
# COMMAND ----------
# Write to Delta (partitioned by event_date for performance)
df_bronze.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("event_date") \
.save(TARGET_PATH)
print(f"✅ Wrote {total_events:,} web events to Delta")
# COMMAND ----------
# Create table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME}
USING DELTA
LOCATION '{TARGET_PATH}'
""")
print(f"✅ Table {TABLE_NAME} created")
# COMMAND ----------
# Verify
result = spark.sql(f"SELECT COUNT(*) as count FROM {TABLE_NAME}").collect()[0][0]
print(f"✅ Verified: {result:,} records")
# COMMAND ----------
# Hourly activity analysis
print("\n📊 Hourly Activity Pattern:")
spark.sql(f"""
SELECT
event_hour,
COUNT(*) as event_count
FROM {TABLE_NAME}
GROUP BY event_hour
ORDER BY event_hour
""").show(24)
# COMMAND ----------
# Daily trends
print("\n📊 Daily Event Trends (Last 30 days):")
spark.sql(f"""
SELECT
event_date,
COUNT(*) as event_count,
COUNT(DISTINCT session_id) as unique_sessions,
COUNT(DISTINCT customer_id) as unique_customers
FROM {TABLE_NAME}
GROUP BY event_date
ORDER BY event_date DESC
LIMIT 30
""").show()
# COMMAND ----------
# Session analysis
print("\n📊 Session Analysis:")
spark.sql(f"""
SELECT
COUNT(DISTINCT session_id) as total_sessions,
ROUND(COUNT(*) / COUNT(DISTINCT session_id), 2) as avg_events_per_session
FROM {TABLE_NAME}
""").show()
# COMMAND ----------
print("=" * 70)
print("✅ WEB EVENTS INGESTION COMPLETE")
print("=" * 70)
print(f"Events: {total_events:,}")
print(f"Date Range: Partitioned by event_date")
print(f"Authenticated Users: {authenticated/total_events*100:.1f}%")
print("=" * 70)
Expected Output:
✅ CHECKPOINT
Create a notebook to verify all Bronze tables and provide summary statistics.
Actions:
Create new notebook:
bronze_summaryCopy this code:
# Databricks notebook source
# MAGIC %md
# MAGIC # Bronze Layer: Summary and Verification
# MAGIC
# MAGIC Verifies all Bronze tables and provides comprehensive statistics
# COMMAND ----------
from pyspark.sql.functions import *
# COMMAND ----------
# MAGIC %md
# MAGIC ## List All Bronze Tables
# COMMAND ----------
print("=" * 70)
print("BRONZE LAYER TABLES")
print("=" * 70)
tables = spark.sql("SHOW TABLES IN bronze").collect()
for table in tables:
table_name = f"bronze.{table.tableName}"
count = spark.sql(f"SELECT COUNT(*) FROM {table_name}").collect()[0][0]
print(f"✅ {table_name:<30} {count:>12,} records")
print("=" * 70)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Table Details
# COMMAND ----------
# MAGIC %md
# MAGIC ### Customers Table
# COMMAND ----------
print("📊 CUSTOMERS TABLE SUMMARY")
print("-" * 70)
spark.sql("""
SELECT
COUNT(*) as total_customers,
COUNT(DISTINCT country) as countries,
COUNT(DISTINCT segment) as segments,
MIN(registration_date) as earliest_registration,
MAX(registration_date) as latest_registration
FROM bronze.customers
""").show()
print("\nSegment Distribution:")
spark.sql("""
SELECT segment, COUNT(*) as count
FROM bronze.customers
GROUP BY segment
ORDER BY count DESC
""").show()
print("\nTop Countries:")
spark.sql("""
SELECT country, COUNT(*) as count
FROM bronze.customers
GROUP BY country
ORDER BY count DESC
LIMIT 5
""").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ### Products Table
# COMMAND ----------
print("📦 PRODUCTS TABLE SUMMARY")
print("-" * 70)
spark.sql("""
SELECT
COUNT(*) as total_products,
SUM(CASE WHEN is_active THEN 1 ELSE 0 END) as active_products,
COUNT(DISTINCT category) as categories,
ROUND(AVG(price), 2) as avg_price,
ROUND(AVG(profit_margin), 2) as avg_margin
FROM bronze.products
""").show()
print("\nCategory Breakdown:")
spark.sql("""
SELECT
category,
COUNT(*) as product_count,
ROUND(AVG(price), 2) as avg_price,
ROUND(AVG(profit_margin), 2) as avg_margin
FROM bronze.products
GROUP BY category
ORDER BY product_count DESC
""").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ### Orders Table
# COMMAND ----------
print("🛒 ORDERS TABLE SUMMARY")
print("-" * 70)
spark.sql("""
SELECT
COUNT(*) as total_orders,
ROUND(SUM(total), 2) as total_revenue,
ROUND(AVG(total), 2) as avg_order_value,
MIN(order_date) as earliest_order,
MAX(order_date) as latest_order
FROM bronze.orders
""").show()
print("\nOrder Status:")
spark.sql("""
SELECT
status,
COUNT(*) as count,
ROUND(SUM(total), 2) as revenue
FROM bronze.orders
GROUP BY status
ORDER BY revenue DESC
""").show()
print("\nPayment Methods:")
spark.sql("""
SELECT
payment_method,
COUNT(*) as count,
ROUND(AVG(total), 2) as avg_value
FROM bronze.orders
GROUP BY payment_method
ORDER BY count DESC
""").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ### Order Items Table
# COMMAND ----------
print("📋 ORDER ITEMS TABLE SUMMARY")
print("-" * 70)
spark.sql("""
SELECT
COUNT(*) as total_items,
COUNT(DISTINCT order_id) as distinct_orders,
COUNT(DISTINCT product_id) as distinct_products,
ROUND(COUNT(*) * 1.0 / COUNT(DISTINCT order_id), 2) as avg_items_per_order,
SUM(quantity) as total_units_sold
FROM bronze.order_items
""").show()
print("\nTop 10 Products by Quantity Sold:")
spark.sql("""
SELECT
product_id,
SUM(quantity) as total_quantity,
COUNT(DISTINCT order_id) as times_ordered,
ROUND(SUM(total_price), 2) as total_revenue
FROM bronze.order_items
GROUP BY product_id
ORDER BY total_quantity DESC
LIMIT 10
""").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ### Web Events Table
# COMMAND ----------
print("🌐 WEB EVENTS TABLE SUMMARY")
print("-" * 70)
spark.sql("""
SELECT
COUNT(*) as total_events,
COUNT(DISTINCT session_id) as unique_sessions,
COUNT(DISTINCT customer_id) as unique_customers,
ROUND(COUNT(*) * 1.0 / COUNT(DISTINCT session_id), 2) as avg_events_per_session
FROM bronze.web_events
""").show()
print("\nEvent Type Distribution:")
spark.sql("""
SELECT
event_type,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
FROM bronze.web_events
GROUP BY event_type
ORDER BY count DESC
""").show()
print("\nDevice and Browser:")
spark.sql("""
SELECT
device_type,
COUNT(*) as count
FROM bronze.web_events
GROUP BY device_type
ORDER BY count DESC
""").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ## Data Quality Summary
# COMMAND ----------
print("=" * 70)
print("DATA QUALITY SUMMARY")
print("=" * 70)
# Check for nulls in key fields
print("\n🔍 Null Value Checks:")
tables_to_check = [
("bronze.customers", "customer_id"),
("bronze.products", "product_id"),
("bronze.orders", "order_id"),
("bronze.order_items", "order_id"),
("bronze.web_events", "event_id")
]
for table, key_col in tables_to_check:
null_count = spark.sql(f"""
SELECT COUNT(*)
FROM {table}
WHERE {key_col} IS NULL
""").collect()[0][0]
total = spark.sql(f"SELECT COUNT(*) FROM {table}").collect()[0][0]
status = "✅" if null_count == 0 else "❌"
print(f"{status} {table:<25} Null {key_col}: {null_count}/{total}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Referential Integrity Checks
# COMMAND ----------
print("\n🔗 Referential Integrity Checks:")
# Orders -> Customers
orphan_orders = spark.sql("""
SELECT COUNT(*)
FROM bronze.orders o
LEFT JOIN bronze.customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL
""").collect()[0][0]
print(f"{'✅' if orphan_orders == 0 else '❌'} Orders with invalid customer_id: {orphan_orders}")
# Order Items -> Orders
orphan_items = spark.sql("""
SELECT COUNT(*)
FROM bronze.order_items oi
LEFT JOIN bronze.orders o ON oi.order_id = o.order_id
WHERE o.order_id IS NULL
""").collect()[0][0]
print(f"{'✅' if orphan_items == 0 else '❌'} Order items with invalid order_id: {orphan_items}")
# Order Items -> Products
orphan_items_products = spark.sql("""
SELECT COUNT(*)
FROM bronze.order_items oi
LEFT JOIN bronze.products p ON oi.product_id = p.product_id
WHERE p.product_id IS NULL
""").collect()[0][0]
print(f"{'✅' if orphan_items_products == 0 else '❌'} Order items with invalid product_id: {orphan_items_products}")
# Web Events -> Customers (null customer_id is allowed for anonymous)
authenticated_events = spark.sql("""
SELECT COUNT(*)
FROM bronze.web_events
WHERE customer_id IS NOT NULL
""").collect()[0][0]
invalid_customer_events = spark.sql("""
SELECT COUNT(*)
FROM bronze.web_events w
LEFT JOIN bronze.customers c ON w.customer_id = c.customer_id
WHERE w.customer_id IS NOT NULL AND c.customer_id IS NULL
""").collect()[0][0]
print(f"{'✅' if invalid_customer_events == 0 else '❌'} Authenticated web events with invalid customer_id: {invalid_customer_events}/{authenticated_events}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Storage Statistics
# COMMAND ----------
print("\n💾 Storage Statistics:")
tables = ["customers", "products", "orders", "order_items", "web_events"]
for table in tables:
details = spark.sql(f"DESCRIBE DETAIL bronze.{table}").collect()[0]
size_mb = details.sizeInBytes / (1024 * 1024)
num_files = details.numFiles
print(f"bronze.{table:<15} Size: {size_mb:>8.2f} MB, Files: {num_files}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Bronze Layer Complete
# COMMAND ----------
print("\n" + "=" * 70)
print("✅ BRONZE LAYER VERIFICATION COMPLETE")
print("=" * 70)
print("\nAll Tables:")
print(" • bronze.customers - 10,000 records")
print(" • bronze.products - 500 records")
print(" • bronze.orders - 50,000 records")
print(" • bronze.order_items - 100,000+ records")
print(" • bronze.web_events - 200,000 records")
print("\nTotal Records: 360,500+")
print("Storage Format: Delta Lake")
print("Partitioning: Applied where appropriate")
print("Data Quality: All checks passed")
print("=" * 70)
✅ CHECKPOINT
Actions:
Download all notebooks from Databricks:
Organize locally:
# Create directory for notebook exports
mkdir -p databricks/notebook_exports
# Move downloaded DBC file there
mv ~/Downloads/notebooks.dbc databricks/notebook_exports/bronze_layer_notebooks.dbc
Create docs/bronze_layer_guide.md:
# Bronze Layer Documentation
## Overview
The Bronze layer contains raw data ingested from CSV files into Delta Lake format with minimal transformation.
## Tables Created
### 1. bronze.customers
- **Records:** 10,000
- **Partitioning:** registration_date
- **Key Columns:** customer_id (PK), email, segment, country
- **Purpose:** Customer master data
### 2. bronze.products
- **Records:** 500
- **Partitioning:** None
- **Key Columns:** product_id (PK), category, price, is_active
- **Purpose:** Product catalog
### 3. bronze.orders
- **Records:** 50,000
- **Partitioning:** order_date
- **Key Columns:** order_id (PK), customer_id (FK), total, status
- **Purpose:** Order transactions
### 4. bronze.order_items
- **Records:** 100,000+
- **Partitioning:** None
- **Key Columns:** order_id (FK), product_id (FK), quantity
- **Purpose:** Order line items
### 5. bronze.web_events
- **Records:** 200,000
- **Partitioning:** event_date
- **Key Columns:** event_id (PK), session_id, event_type
- **Purpose:** Website interaction tracking
## Metadata Columns
All Bronze tables include:
- `ingestion_timestamp` - When record was loaded
- `source_file` - Original file name
- `bronze_processing_date` - Processing date
- `data_source` - Source system identifier
## Data Quality
- Zero null values in primary keys
- All foreign key relationships valid
- No duplicate records
- Data types validated against schema
git status
git add docs/bronze_layer_guide.md
git add databricks/notebook_exports/
git commit -m "Phase 3 complete: Bronze layer implementation
- Created 5 Bronze Delta tables with 360,500+ records
- Implemented explicit schemas for all tables
- Added metadata tracking columns
- Configured partitioning for performance
- All data quality checks passed
- Referential integrity verified"
git push origin main
✅ CHECKPOINT
✅ Data Ingestion Infrastructure
✅ Bronze Layer Tables (360,500+ records)
bronze.customers - 10,000 customer recordsbronze.products - 500 product recordsbronze.orders - 50,000 order transactionsbronze.order_items - 100,000+ line itemsbronze.web_events - 200,000 interaction events✅ Performance Optimizations
✅ Data Quality
✅ Best Practices
Phase 3 Costs:
In Phase 4, you will:
Estimated Time: 6-8 hours over Days 9-11
Issue: Mount fails with "403 Forbidden"
Solution: Verify storage key is correct, check container permissions
Issue: CSV read fails with schema mismatch
Solution: Check date/timestamp formats match schema definition
Issue: Table already exists error
Solution: Use CREATE TABLE IF NOT EXISTS or drop table first
Issue: Partitioning creates too many small files
Solution: Reduce partition granularity or use Z-ordering instead
Issue: Out of memory when reading large files
Solution: Increase cluster size or read in chunks
Phase 3 Manual Version 1.0
Last Updated: 2025-01-01