Duration: Days 9-11 | 6-8 hours total
Goal: Transform Bronze data into clean, validated Silver layer tables using dbt
In Phase 4, you will:
Silver Layer Philosophy: Clean, conform, and validate data. Apply business rules, standardize formats, and ensure data quality.
Before starting Phase 4:
Bronze Tables (raw data)
↓
dbt Models (SQL)
↓
Silver Tables (cleaned)
↓
[stg_customers]
[stg_products]
[stg_orders]
[dim_customers]
[dim_products]
[fact_orders]
# Activate virtual environment
source venv/bin/activate
# Install dbt-databricks
pip install dbt-databricks==1.6.2
# Verify installation
dbt --version
Expected output:
installed version: 1.6.2
# Navigate to project root
cd ecommerce-data-platform
# Initialize dbt project in the dbt folder
dbt init ecommerce_dbt --profiles-dir ./dbt
When prompted:
# dbt init creates a subfolder, we want the contents in our dbt folder
mv ecommerce_dbt/* dbt/
rmdir ecommerce_dbt
Edit dbt/dbt_project.yml:
name: 'ecommerce_analytics'
version: '1.0.0'
config-version: 2
profile: 'ecommerce_dbt'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
ecommerce_analytics:
# Bronze layer - just document, don't transform
bronze:
+materialized: view
+schema: bronze
# Staging models - clean and standardize
staging:
+materialized: view
+schema: silver
# Silver dimensional models
silver:
+materialized: table
+schema: silver
vars:
# Date range for incremental loads
start_date: '2023-01-01'
end_date: '2024-12-31'
Create/edit dbt/profiles.yml:
ecommerce_dbt:
target: dev
outputs:
dev:
type: databricks
host: YOUR-WORKSPACE.azuredatabricks.net
http_path: /sql/1.0/warehouses/YOUR-HTTP-PATH
token: YOUR-DATABRICKS-TOKEN
schema: silver
threads: 4
prod:
type: databricks
host: YOUR-WORKSPACE.azuredatabricks.net
http_path: /sql/1.0/warehouses/YOUR-HTTP-PATH
token: YOUR-DATABRICKS-TOKEN
schema: silver
threads: 8
Note: Replace placeholders with your actual values from .env file.
cd dbt
dbt debug
You should see:
All checks passed!
✅ CHECKPOINT
Define Bronze tables as dbt sources.
mkdir -p models/sources
models/sources/bronze_sources.yml:version: 2
sources:
- name: bronze
description: "Raw data ingested from CSV files"
database: default
schema: bronze
tables:
- name: customers
description: "Customer master data"
columns:
- name: customer_id
description: "Unique customer identifier"
tests:
- unique
- not_null
- name: email
tests:
- not_null
- name: segment
tests:
- accepted_values:
values: ['Premium', 'Regular', 'Occasional', 'New']
- name: products
description: "Product catalog"
columns:
- name: product_id
description: "Unique product identifier"
tests:
- unique
- not_null
- name: price
tests:
- not_null
- name: is_active
tests:
- not_null
- name: orders
description: "Order transactions"
columns:
- name: order_id
description: "Unique order identifier"
tests:
- unique
- not_null
- name: customer_id
description: "Foreign key to customers"
tests:
- not_null
- relationships:
to: source('bronze', 'customers')
field: customer_id
- name: status
tests:
- accepted_values:
values: ['completed', 'pending', 'cancelled', 'returned']
- name: order_items
description: "Order line items"
columns:
- name: order_id
tests:
- not_null
- relationships:
to: source('bronze', 'orders')
field: order_id
- name: product_id
tests:
- not_null
- relationships:
to: source('bronze', 'products')
field: product_id
- name: web_events
description: "Website interaction events"
columns:
- name: event_id
tests:
- unique
- not_null
- name: event_type
tests:
- not_null
dbt source freshness
✅ CHECKPOINT
Staging models clean and standardize data from Bronze sources.
mkdir -p models/staging
models/staging/stg_customers.sql:{{
config(
materialized='view',
schema='silver'
)
}}
with source as (
select * from {{ source('bronze', 'customers') }}
),
cleaned as (
select
-- Primary key
customer_id,
-- Customer info (cleaned)
lower(trim(email)) as email,
trim(first_name) as first_name,
trim(last_name) as last_name,
-- Full name
concat(
coalesce(trim(first_name), ''),
' ',
coalesce(trim(last_name), '')
) as full_name,
-- Dates
registration_date,
date_format(registration_date, 'yyyy-MM') as registration_month,
year(registration_date) as registration_year,
-- Location (standardized)
upper(trim(country)) as country,
upper(trim(state)) as state,
-- Segment (cleaned)
trim(segment) as segment,
-- Marketing
marketing_opt_in,
-- Metadata
created_at,
updated_at,
ingestion_timestamp,
-- Calculated fields
datediff(current_date(), registration_date) as days_since_registration,
case
when segment = 'Premium' then 1
when segment = 'Regular' then 2
when segment = 'Occasional' then 3
when segment = 'New' then 4
else 5
end as segment_rank,
-- Data quality flags
case
when email is null or email = '' then false
when first_name is null or first_name = '' then false
when last_name is null or last_name = '' then false
else true
end as is_complete_profile
from source
)
select * from cleaned
models/staging/stg_products.sql:{{
config(
materialized='view',
schema='silver'
)
}}
with source as (
select * from {{ source('bronze', 'products') }}
),
cleaned as (
select
-- Primary key
product_id,
-- Product info
trim(product_name) as product_name,
trim(category) as category,
trim(subcategory) as subcategory,
-- Pricing
cast(price as decimal(10,2)) as price,
cast(cost as decimal(10,2)) as cost,
cast(price - cost as decimal(10,2)) as profit,
round((price - cost) / price * 100, 2) as profit_margin_pct,
-- Inventory
stock_quantity,
case
when stock_quantity = 0 then 'Out of Stock'
when stock_quantity < 50 then 'Low Stock'
when stock_quantity < 200 then 'Medium Stock'
else 'High Stock'
end as stock_status,
-- Supplier
trim(supplier) as supplier,
-- Status
is_active,
-- Price tier
case
when price < 50 then 'Budget'
when price < 200 then 'Mid-Range'
when price < 500 then 'Premium'
else 'Luxury'
end as price_tier,
-- Metadata
created_at,
ingestion_timestamp
from source
)
select * from cleaned
models/staging/stg_orders.sql:{{
config(
materialized='view',
schema='silver'
)
}}
with source as (
select * from {{ source('bronze', 'orders') }}
),
cleaned as (
select
-- Primary keys
order_id,
customer_id,
-- Dates
order_date,
order_timestamp,
date_format(order_date, 'yyyy-MM') as order_month,
year(order_date) as order_year,
quarter(order_date) as order_quarter,
dayofweek(order_date) as day_of_week,
case
when dayofweek(order_date) in (1, 7) then true
else false
end as is_weekend,
-- Status
lower(trim(status)) as status,
case
when lower(trim(status)) = 'completed' then 'Completed'
when lower(trim(status)) = 'pending' then 'Pending'
when lower(trim(status)) = 'cancelled' then 'Cancelled'
when lower(trim(status)) = 'returned' then 'Returned'
else 'Unknown'
end as status_clean,
-- Payment
lower(trim(payment_method)) as payment_method,
-- Amounts
cast(subtotal as decimal(10,2)) as subtotal,
cast(discount as decimal(10,2)) as discount,
cast(shipping_cost as decimal(10,2)) as shipping_cost,
cast(tax as decimal(10,2)) as tax,
cast(total as decimal(10,2)) as total,
-- Discount metrics
case when discount > 0 then true else false end as has_discount,
round(discount / subtotal * 100, 2) as discount_pct,
-- Shipping
case when shipping_cost = 0 then true else false end as free_shipping,
trim(shipping_address) as shipping_address,
trim(shipping_city) as shipping_city,
upper(trim(shipping_state)) as shipping_state,
upper(trim(shipping_country)) as shipping_country,
-- Revenue classification
case
when total < 50 then 'Small'
when total < 150 then 'Medium'
when total < 300 then 'Large'
else 'Extra Large'
end as order_size,
-- Metadata
created_at,
updated_at,
ingestion_timestamp
from source
)
select * from cleaned
models/staging/stg_order_items.sql:{{
config(
materialized='view',
schema='silver'
)
}}
with source as (
select * from {{ source('bronze', 'order_items') }}
),
cleaned as (
select
-- Keys
order_id,
product_id,
-- Quantities
quantity,
-- Pricing
cast(unit_price as decimal(10,2)) as unit_price,
cast(total_price as decimal(10,2)) as total_price,
-- Calculated
round(total_price / quantity, 2) as calculated_unit_price,
-- Validation flag
case
when abs(unit_price * quantity - total_price) > 0.01 then false
else true
end as price_calculation_valid,
-- Metadata
ingestion_timestamp
from source
)
select * from cleaned
models/staging/stg_web_events.sql:{{
config(
materialized='view',
schema='silver'
)
}}
with source as (
select * from {{ source('bronze', 'web_events') }}
),
cleaned as (
select
-- Keys
event_id,
session_id,
customer_id,
product_id,
-- Timestamp
event_timestamp,
date(event_timestamp) as event_date,
hour(event_timestamp) as event_hour,
date_format(event_timestamp, 'yyyy-MM') as event_month,
-- Time of day
case
when hour(event_timestamp) between 6 and 11 then 'Morning'
when hour(event_timestamp) between 12 and 17 then 'Afternoon'
when hour(event_timestamp) between 18 and 21 then 'Evening'
else 'Night'
end as time_of_day,
-- Event details
lower(trim(event_type)) as event_type,
trim(search_query) as search_query,
-- Device & browser
lower(trim(device_type)) as device_type,
lower(trim(browser)) as browser,
-- URLs
trim(page_url) as page_url,
trim(referrer_url) as referrer_url,
-- User classification
case
when customer_id is not null then 'Authenticated'
else 'Anonymous'
end as user_type,
-- Event classification
case
when event_type in ('add_to_cart', 'wishlist_add') then 'Conversion'
when event_type = 'search' then 'Search'
when event_type in ('page_view', 'product_view') then 'Browse'
else 'Other'
end as event_category,
-- Metadata
ingestion_timestamp
from source
)
select * from cleaned
models/staging/schema.yml:version: 2
models:
- name: stg_customers
description: "Cleaned and standardized customer data"
columns:
- name: customer_id
description: "Unique customer identifier"
tests:
- unique
- not_null
- name: email
tests:
- not_null
- name: segment
tests:
- accepted_values:
values: ['Premium', 'Regular', 'Occasional', 'New']
- name: stg_products
description: "Cleaned and standardized product data"
columns:
- name: product_id
tests:
- unique
- not_null
- name: profit_margin_pct
description: "Profit margin as percentage"
tests:
- not_null
- name: stg_orders
description: "Cleaned and standardized order data"
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: total
tests:
- not_null
- name: stg_order_items
description: "Cleaned and standardized order line items"
columns:
- name: order_id
tests:
- not_null
- name: product_id
tests:
- not_null
- name: price_calculation_valid
description: "Flag for price calculation accuracy"
tests:
- accepted_values:
values: [true]
- name: stg_web_events
description: "Cleaned and standardized web events"
columns:
- name: event_id
tests:
- unique
- not_null
dbt run --models staging
Expected output:
Completed successfully
Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5
dbt test --models staging
✅ CHECKPOINT
Build dimension and fact tables for analytics.
mkdir -p models/silver
models/silver/dim_customers.sql:{{
config(
materialized='table',
schema='silver'
)
}}
with customers as (
select * from {{ ref('stg_customers') }}
),
-- Aggregate customer order history
customer_orders as (
select
customer_id,
count(distinct order_id) as lifetime_orders,
sum(case when status = 'completed' then 1 else 0 end) as completed_orders,
sum(case when status = 'completed' then total else 0 end) as lifetime_value,
min(case when status = 'completed' then order_date end) as first_order_date,
max(case when status = 'completed' then order_date end) as last_order_date,
avg(case when status = 'completed' then total end) as avg_order_value,
max(total) as largest_order_value
from {{ ref('stg_orders') }}
group by customer_id
),
final as (
select
-- Customer info
c.customer_id,
c.email,
c.first_name,
c.last_name,
c.full_name,
c.country,
c.state,
c.segment,
c.segment_rank,
c.marketing_opt_in,
c.is_complete_profile,
-- Dates
c.registration_date,
c.registration_month,
c.registration_year,
c.days_since_registration,
-- Order metrics
coalesce(co.lifetime_orders, 0) as lifetime_orders,
coalesce(co.completed_orders, 0) as completed_orders,
coalesce(co.lifetime_value, 0) as lifetime_value,
coalesce(co.avg_order_value, 0) as avg_order_value,
coalesce(co.largest_order_value, 0) as largest_order_value,
co.first_order_date,
co.last_order_date,
-- Customer classification
case
when coalesce(co.lifetime_value, 0) = 0 then 'Never Purchased'
when coalesce(co.lifetime_value, 0) < 100 then 'Low Value'
when coalesce(co.lifetime_value, 0) < 500 then 'Medium Value'
when coalesce(co.lifetime_value, 0) < 1000 then 'High Value'
else 'VIP'
end as value_segment,
case
when co.last_order_date is null then 'No Orders'
when datediff(current_date(), co.last_order_date) <= 30 then 'Active'
when datediff(current_date(), co.last_order_date) <= 90 then 'At Risk'
else 'Churned'
end as lifecycle_stage,
-- Timestamps
c.created_at,
c.updated_at,
current_timestamp() as dbt_updated_at
from customers c
left join customer_orders co on c.customer_id = co.customer_id
)
select * from final
models/silver/dim_products.sql:{{
config(
materialized='table',
schema='silver'
)
}}
with products as (
select * from {{ ref('stg_products') }}
),
-- Product sales metrics
product_sales as (
select
oi.product_id,
count(distinct oi.order_id) as times_ordered,
sum(oi.quantity) as total_quantity_sold,
sum(oi.total_price) as total_revenue,
avg(oi.quantity) as avg_quantity_per_order,
max(o.order_date) as last_ordered_date
from {{ ref('stg_order_items') }} oi
join {{ ref('stg_orders') }} o on oi.order_id = o.order_id
where o.status = 'completed'
group by oi.product_id
),
-- Product view events
product_views as (
select
product_id,
count(*) as total_views
from {{ ref('stg_web_events') }}
where event_type = 'product_view'
and product_id is not null
group by product_id
),
final as (
select
-- Product info
p.product_id,
p.product_name,
p.category,
p.subcategory,
-- Pricing
p.price,
p.cost,
p.profit,
p.profit_margin_pct,
p.price_tier,
-- Inventory
p.stock_quantity,
p.stock_status,
p.supplier,
p.is_active,
-- Sales metrics
coalesce(ps.times_ordered, 0) as times_ordered,
coalesce(ps.total_quantity_sold, 0) as total_quantity_sold,
coalesce(ps.total_revenue, 0) as total_revenue,
coalesce(ps.avg_quantity_per_order, 0) as avg_quantity_per_order,
ps.last_ordered_date,
-- Web metrics
coalesce(pv.total_views, 0) as total_views,
-- Conversion rate
case
when coalesce(pv.total_views, 0) > 0
then round(cast(coalesce(ps.times_ordered, 0) as decimal) / pv.total_views * 100, 2)
else 0
end as view_to_order_rate,
-- Performance classification
case
when coalesce(ps.total_quantity_sold, 0) = 0 then 'Never Sold'
when coalesce(ps.total_quantity_sold, 0) < 10 then 'Low Seller'
when coalesce(ps.total_quantity_sold, 0) < 50 then 'Medium Seller'
when coalesce(ps.total_quantity_sold, 0) < 100 then 'High Seller'
else 'Top Seller'
end as sales_category,
-- Timestamps
p.created_at,
current_timestamp() as dbt_updated_at
from products p
left join product_sales ps on p.product_id = ps.product_id
left join product_views pv on p.product_id = pv.product_id
)
select * from final
models/silver/fact_orders.sql:{{
config(
materialized='table',
schema='silver'
)
}}
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select * from {{ ref('stg_order_items') }}
),
-- Aggregate order items
order_aggregates as (
select
order_id,
count(*) as num_items,
count(distinct product_id) as num_unique_products,
sum(quantity) as total_quantity,
sum(total_price) as items_subtotal
from order_items
group by order_id
),
final as (
select
-- Keys
o.order_id,
o.customer_id,
-- Dates
o.order_date,
o.order_timestamp,
o.order_month,
o.order_year,
o.order_quarter,
o.day_of_week,
o.is_weekend,
-- Status
o.status,
o.status_clean,
o.payment_method,
-- Amounts
o.subtotal,
o.discount,
o.shipping_cost,
o.tax,
o.total,
-- Metrics
o.has_discount,
o.discount_pct,
o.free_shipping,
o.order_size,
-- Order composition
oa.num_items,
oa.num_unique_products,
oa.total_quantity,
-- Location
o.shipping_city,
o.shipping_state,
o.shipping_country,
-- Timestamps
o.created_at,
o.updated_at,
current_timestamp() as dbt_updated_at
from orders o
left join order_aggregates oa on o.order_id = oa.order_id
)
select * from final
models/silver/fact_order_items.sql:{{
config(
materialized='table',
schema='silver'
)
}}
with order_items as (
select * from {{ ref('stg_order_items') }}
),
orders as (
select
order_id,
customer_id,
order_date,
status
from {{ ref('stg_orders') }}
),
products as (
select
product_id,
product_name,
category,
profit_margin_pct
from {{ ref('stg_products') }}
),
final as (
select
-- Keys
oi.order_id,
o.customer_id,
oi.product_id,
-- Order info
o.order_date,
o.status,
-- Product info
p.product_name,
p.category,
-- Quantities and prices
oi.quantity,
oi.unit_price,
oi.total_price,
-- Calculated metrics
oi.total_price / oi.quantity as effective_unit_price,
oi.total_price * (p.profit_margin_pct / 100) as estimated_profit,
-- Timestamps
current_timestamp() as dbt_updated_at
from order_items oi
join orders o on oi.order_id = o.order_id
join products p on oi.product_id = p.product_id
)
select * from final
models/silver/fact_web_events.sql:{{
config(
materialized='table',
schema='silver'
)
}}
with events as (
select * from {{ ref('stg_web_events') }}
),
final as (
select
-- Keys
event_id,
session_id,
customer_id,
product_id,
-- Timestamp
event_timestamp,
event_date,
event_hour,
event_month,
time_of_day,
-- Event details
event_type,
event_category,
search_query,
-- Device
device_type,
browser,
-- Classification
user_type,
-- Timestamps
current_timestamp() as dbt_updated_at
from events
)
select * from final
models/silver/schema.yml:version: 2
models:
- name: dim_customers
description: "Customer dimension with lifetime metrics"
columns:
- name: customer_id
description: "Primary key"
tests:
- unique
- not_null
- name: lifetime_value
description: "Total revenue from customer"
- name: value_segment
description: "Customer value classification"
- name: lifecycle_stage
description: "Customer activity status"
- name: dim_products
description: "Product dimension with sales performance"
columns:
- name: product_id
description: "Primary key"
tests:
- unique
- not_null
- name: total_revenue
description: "Total revenue generated by product"
- name: sales_category
description: "Sales performance classification"
- name: fact_orders
description: "Order transactions fact table"
columns:
- name: order_id
description: "Primary key"
tests:
- unique
- not_null
- name: customer_id
description: "Foreign key to dim_customers"
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: total
description: "Order total amount"
tests:
- not_null
- name: fact_order_items
description: "Order line items fact table"
columns:
- name: order_id
description: "Foreign key to fact_orders"
tests:
- not_null
- name: product_id
description: "Foreign key to dim_products"
tests:
- not_null
- relationships:
to: ref('dim_products')
field: product_id
- name: fact_web_events
description: "Web interaction events fact table"
columns:
- name: event_id
description: "Primary key"
tests:
- unique
- not_null
- name: event_type
tests:
- not_null
dbt run --models silver
Expected output:
Completed successfully
Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5
dbt test --models silver
Go to Databricks SQL Editor and run:
-- Check tables created
SHOW TABLES IN silver;
-- Check record counts
SELECT 'dim_customers' as table_name, COUNT(*) as row_count FROM silver.dim_customers
UNION ALL
SELECT 'dim_products', COUNT(*) FROM silver.dim_products
UNION ALL
SELECT 'fact_orders', COUNT(*) FROM silver.fact_orders
UNION ALL
SELECT 'fact_order_items', COUNT(*) FROM silver.fact_order_items
UNION ALL
SELECT 'fact_web_events', COUNT(*) FROM silver.fact_web_events;
✅ CHECKPOINT
Create custom data quality tests specific to your business logic.
mkdir -p tests
tests/assert_positive_order_totals.sql:-- Test that all completed orders have positive totals
select
order_id,
total
from {{ ref('fact_orders') }}
where status = 'completed'
and total <= 0
tests/assert_customer_lifetime_value_positive.sql:-- Test that customers with orders have positive lifetime value
select
customer_id,
lifetime_value,
lifetime_orders
from {{ ref('dim_customers') }}
where lifetime_orders > 0
and lifetime_value <= 0
tests/assert_product_profit_margin_reasonable.sql:-- Test that profit margins are within reasonable range (0-100%)
select
product_id,
product_name,
profit_margin_pct
from {{ ref('dim_products') }}
where profit_margin_pct < 0
or profit_margin_pct > 100
tests/assert_order_items_match_order_subtotal.sql:-- Test that order items total matches order subtotal
with order_totals as (
select
order_id,
sum(total_price) as items_total
from {{ ref('fact_order_items') }}
group by order_id
),
orders as (
select
order_id,
subtotal
from {{ ref('fact_orders') }}
)
select
o.order_id,
o.subtotal,
ot.items_total,
abs(o.subtotal - ot.items_total) as difference
from orders o
join order_totals ot on o.order_id = ot.order_id
where abs(o.subtotal - ot.items_total) > 0.01 -- Allow for rounding
tests/assert_no_future_dates.sql:-- Test that no records have future dates
select 'orders' as source, order_id as id, order_date as date_field
from {{ ref('fact_orders') }}
where order_date > current_date()
union all
select 'web_events', event_id, cast(event_date as date)
from {{ ref('fact_web_events') }}
where event_date > current_date()
union all
select 'customers', customer_id, registration_date
from {{ ref('dim_customers') }}
where registration_date > current_date()
dbt test --select test_type:singular
✅ CHECKPOINT
Generate comprehensive dbt documentation.
Edit models/silver/schema.yml to add more detailed descriptions:
version: 2
models:
- name: dim_customers
description: |
Customer dimension table containing demographic information and
lifetime value metrics. One row per customer.
Includes:
- Basic customer information (name, email, location)
- Segmentation (Premium, Regular, Occasional, New)
- Lifetime value and order history
- Lifecycle stage (Active, At Risk, Churned)
columns:
- name: customer_id
description: "Unique identifier for customer - Primary Key"
tests:
- unique
- not_null
- name: lifetime_value
description: |
Total revenue generated from this customer from completed orders.
Excludes cancelled and returned orders.
- name: value_segment
description: |
Classification based on lifetime value:
- Never Purchased: $0
- Low Value: < $100
- Medium Value: $100-500
- High Value: $500-1000
- VIP: > $1000
- name: lifecycle_stage
description: |
Current engagement status:
- No Orders: Never purchased
- Active: Purchased within 30 days
- At Risk: Last purchase 31-90 days ago
- Churned: No purchase in 90+ days
# Add similar detailed descriptions for other models
dbt docs generate
dbt docs serve
This will open documentation in your browser at http://localhost:8080
✅ CHECKPOINT
Create reusable SQL macros.
macros/cents_to_dollars.sql:{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
macros/get_date_parts.sql:{% macro get_date_parts(date_column) %}
date({{ date_column }}) as date,
year({{ date_column }}) as year,
quarter({{ date_column }}) as quarter,
month({{ date_column }}) as month,
dayofmonth({{ date_column }}) as day,
dayofweek({{ date_column }}) as day_of_week
{% endmacro %}
macros/generate_surrogate_key.sql:{% macro generate_surrogate_key(field_list) %}
md5(concat(
{% for field in field_list %}
coalesce(cast({{ field }} as string), '')
{% if not loop.last %},{% endif %}
{% endfor %}
))
{% endmacro %}
macros/pivot_metric.sql:{% macro pivot_metric(column_name, values, agg='sum', alias_prefix='') %}
{% for value in values %}
{{ agg }}(case when {{ column_name }} = '{{ value }}' then 1 else 0 end)
as {{ alias_prefix }}{{ value | replace(' ', '_') | lower }}
{% if not loop.last %},{% endif %}
{% endfor %}
{% endmacro %}
Create models/silver/dim_date.sql:
{{
config(
materialized='table',
schema='silver'
)
}}
-- Generate a date dimension using macro
with date_spine as (
{{ dbt_utils.date_spine(
datepart="day",
start_date="cast('2023-01-01' as date)",
end_date="cast('2025-12-31' as date)"
) }}
),
final as (
select
{{ generate_surrogate_key(['date_day']) }} as date_key,
date_day as date,
{{ get_date_parts('date_day') }},
case when dayofweek(date_day) in (1, 7) then true else false end as is_weekend,
case
when month(date_day) in (12, 1, 2) then 'Winter'
when month(date_day) in (3, 4, 5) then 'Spring'
when month(date_day) in (6, 7, 8) then 'Summer'
else 'Fall'
end as season
from date_spine
)
select * from final
Note: This requires dbt_utils package. Install it first:
Create packages.yml in dbt directory:
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
Install packages:
dbt deps
dbt run --models dim_date
✅ CHECKPOINT
dbt build
This runs models, tests, and generates documentation in one command.
dbt run --models silver
dbt test
dbt docs generate
Create notebook silver_validation:
# Databricks notebook source
# MAGIC %md
# MAGIC # Silver Layer Validation
# COMMAND ----------
print("=" * 70)
print("SILVER LAYER TABLE COUNTS")
print("=" * 70)
tables = [
'stg_customers',
'stg_products',
'stg_orders',
'stg_order_items',
'stg_web_events',
'dim_customers',
'dim_products',
'fact_orders',
'fact_order_items',
'fact_web_events'
]
for table in tables:
count = spark.sql(f"SELECT COUNT(*) FROM silver.{table}").collect()[0][0]
print(f"silver.{table:<25} {count:>12,} records")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Customer Metrics
# COMMAND ----------
print("Customer Segmentation:")
spark.sql("""
SELECT
value_segment,
lifecycle_stage,
COUNT(*) as customer_count,
ROUND(AVG(lifetime_value), 2) as avg_ltv,
ROUND(AVG(lifetime_orders), 2) as avg_orders
FROM silver.dim_customers
GROUP BY value_segment, lifecycle_stage
ORDER BY value_segment, lifecycle_stage
""").show()
# COMMAND ----------
# MAGIC %md
# MAGIC ## Product Performance
# COMMAND ----------
print("Top 10 Products by Revenue:")
spark.sql("""
SELECT
product_name,
category,
total_revenue,
total_quantity_sold,
times_ordered,
sales_category
FROM silver.dim_products
ORDER BY total_revenue DESC
LIMIT 10
""").show(truncate=False)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Revenue Analysis
# COMMAND ----------
print("Monthly Revenue Trend:")
spark.sql("""
SELECT
order_month,
COUNT(DISTINCT order_id) as orders,
SUM(total) as revenue,
ROUND(AVG(total), 2) as avg_order_value
FROM silver.fact_orders
WHERE status = 'completed'
GROUP BY order_month
ORDER BY order_month DESC
LIMIT 12
""").show()
# COMMAND ----------
print("=" * 70)
print("✅ SILVER LAYER VALIDATION COMPLETE")
print("=" * 70)
✅ CHECKPOINT
dbt/README.md:# E-Commerce Analytics dbt Project
## Overview
This dbt project transforms raw e-commerce data from Bronze layer into
cleaned and modeled Silver layer tables.
## Project Structure
dbt/ ├── models/ │ ├── sources/ # Source definitions for Bronze tables │ ├── staging/ # Cleaned staging models │ └── silver/ # Dimensional models ├── tests/ # Custom data quality tests ├── macros/ # Reusable SQL macros └── dbt_project.yml # Project configuration
## Models
### Staging Layer
- `stg_customers` - Cleaned customer data
- `stg_products` - Cleaned product data
- `stg_orders` - Cleaned order data
- `stg_order_items` - Cleaned order line items
- `stg_web_events` - Cleaned web events
### Silver Layer
- `dim_customers` - Customer dimension with lifetime metrics
- `dim_products` - Product dimension with sales performance
- `dim_date` - Date dimension for time-based analysis
- `fact_orders` - Order transaction facts
- `fact_order_items` - Order line item facts
- `fact_web_events` - Web event facts
## Running the Project
### Run all models
```bash
dbt run
dbt run --models staging
dbt run --models silver
dbt run --models dim_customers
dbt test
dbt docs generate
dbt docs serve
The project includes:
dbt run --models <model_name>dbt test --models <model_name>dbt docs generate
2. **Create `docs/silver_layer_guide.md`:**
```markdown
# Silver Layer Documentation
## Overview
The Silver layer contains cleaned, conformed, and validated data ready
for analytics.
## Data Model
### Dimensions
- **dim_customers**: Customer master with demographics and lifetime metrics
- **dim_products**: Product catalog with sales performance
- **dim_date**: Calendar dimension for time-based analysis
### Facts
- **fact_orders**: One row per order transaction
- **fact_order_items**: One row per order line item
- **fact_web_events**: One row per web interaction event
## Key Metrics
### Customer Metrics
- `lifetime_value`: Total revenue from completed orders
- `lifetime_orders`: Count of all orders
- `avg_order_value`: Average order amount
- `value_segment`: Customer value classification
- `lifecycle_stage`: Current engagement status
### Product Metrics
- `total_revenue`: Total sales revenue
- `total_quantity_sold`: Units sold
- `times_ordered`: Number of orders containing product
- `profit_margin_pct`: Profit margin percentage
- `sales_category`: Sales performance classification
### Order Metrics
- `num_items`: Number of line items in order
- `total_quantity`: Total units in order
- `discount_pct`: Discount as percentage of subtotal
- `order_size`: Classification (Small/Medium/Large/Extra Large)
## Data Quality
All Silver tables include:
- Primary key uniqueness tests
- Not null tests on required fields
- Referential integrity tests
- Custom business logic validation
## Refresh Schedule
- **Staging models**: Run on demand (views)
- **Silver dimensions**: Daily refresh (tables)
- **Silver facts**: Daily refresh (tables)
## Usage Examples
### Customer Analysis
```sql
SELECT
segment,
value_segment,
lifecycle_stage,
COUNT(*) as customers,
AVG(lifetime_value) as avg_ltv
FROM silver.dim_customers
GROUP BY segment, value_segment, lifecycle_stage;
SELECT
category,
sales_category,
COUNT(*) as products,
SUM(total_revenue) as revenue
FROM silver.dim_products
WHERE is_active = true
GROUP BY category, sales_category;
SELECT
order_month,
COUNT(*) as orders,
SUM(total) as revenue,
AVG(total) as aov
FROM silver.fact_orders
WHERE status = 'completed'
GROUP BY order_month
ORDER BY order_month;
**✅ CHECKPOINT**
- dbt project documentation created
- Silver layer guide documented
- Usage examples provided
---
## STEP 4.10: Commit Phase 4 to Git (15 minutes)
### Actions:
```bash
# Check status
git status
# Add all dbt files
git add dbt/
git add docs/silver_layer_guide.md
git add databricks/notebook_exports/ # if you exported validation notebook
# Commit
git commit -m "Phase 4 complete: Silver layer with dbt
- Installed and configured dbt for Databricks
- Created source definitions for Bronze tables
- Built 5 staging models with data cleaning
- Created 3 dimension tables (customers, products, date)
- Created 3 fact tables (orders, order_items, web_events)
- Implemented 15+ data quality tests
- Created custom macros for reusability
- Generated comprehensive documentation
- All models tested and validated"
# Push to GitHub
git push origin main
✅ CHECKPOINT
✅ dbt Infrastructure
✅ Staging Layer (5 models)
✅ Silver Dimensional Models (6 tables)
dim_customers - 10,000 customers with lifetime metricsdim_products - 500 products with performance datadim_date - Date dimension for analyticsfact_orders - 50,000 order transactionsfact_order_items - 100,000+ line itemsfact_web_events - 200,000 web interactions✅ Data Quality
✅ Documentation
✅ Reusability
Data Volume:
Development Time:
In Phase 5, you will:
Estimated Time: 6-8 hours over Days 12-14
Issue: dbt debug fails with connection error
Solution: Verify DATABRICKS_HOST, token, and http_path in profiles.yml
Issue: Model fails with "table not found"
Solution: Ensure Bronze tables exist, run dbt run --models bronze first
Issue: Tests failing on relationships
Solution: Check that parent tables have all required keys
Issue: "Permission denied" when creating tables
Solution: Verify Databricks token has CREATE TABLE permissions
Issue: dbt runs slowly
Solution: Increase threads in profiles.yml, ensure cluster has enough workers
Phase 4 Manual Version 1.0
Last Updated: 2025-01-01