Duration: Days 15-16 | 4-6 hours total
Goal: Implement comprehensive data quality framework with automated testing
In Phase 6, you will:
Data Quality Philosophy: Trust but verify. Automated testing ensures data reliability and catches issues before they reach end users.
Before starting Phase 6:
Data Sources
↓
Quality Checks → [Great Expectations]
↓
dbt Tests → [Schema + Custom Tests]
↓
Quality Dashboard → [Metrics + Alerts]
↓
Data Consumers
# Activate virtual environment
source venv/bin/activate
# Install Great Expectations
pip install great-expectations==0.17.10
# Verify installation
great_expectations --version
# Create directory
mkdir -p great_expectations
cd great_expectations
# Initialize
great_expectations init
When prompted:
Create great_expectations/great_expectations.yml:
config_version: 3.0
datasources:
ecommerce_databricks:
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
spark_config:
spark.master: local[*]
spark.app.name: great_expectations
data_connectors:
default_runtime_data_connector:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
stores:
expectations_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: expectations/
validations_store:
class_name: ValidationsStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/validations/
evaluation_parameter_store:
class_name: EvaluationParameterStore
checkpoint_store:
class_name: CheckpointStore
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: checkpoints/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_store
data_docs_sites:
local_site:
class_name: SiteBuilder
store_backend:
class_name: TupleFilesystemStoreBackend
base_directory: uncommitted/data_docs/local_site/
site_index_builder:
class_name: DefaultSiteIndexBuilder
Create scripts/ge_databricks_connector.py:
"""
Great Expectations connector for Databricks
"""
import great_expectations as ge
from great_expectations.core.batch import RuntimeBatchRequest
from pyspark.sql import SparkSession
import os
from dotenv import load_dotenv
load_dotenv()
# Initialize Spark session for Databricks
def get_spark_session():
"""Create Spark session configured for Databricks"""
return SparkSession.builder \
.appName("GreatExpectations") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
def get_ge_context():
"""Get Great Expectations context"""
context = ge.get_context()
return context
def validate_table(table_name, expectation_suite_name):
"""
Validate a Databricks table using Great Expectations
Args:
table_name: Full table name (e.g., 'silver.dim_customers')
expectation_suite_name: Name of expectation suite to use
"""
context = get_ge_context()
spark = get_spark_session()
# Read table
df = spark.table(table_name)
# Create batch request
batch_request = RuntimeBatchRequest(
datasource_name="ecommerce_databricks",
data_connector_name="default_runtime_data_connector",
data_asset_name=table_name,
runtime_parameters={"batch_data": df},
batch_identifiers={"default_identifier_name": table_name}
)
# Run validation
results = context.run_checkpoint(
checkpoint_name=f"{expectation_suite_name}_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name
}
]
)
return results
if __name__ == "__main__":
print("Great Expectations Databricks Connector initialized")
✅ CHECKPOINT
Create data quality expectations for key tables.
Create great_expectations/expectations/dim_customers_suite.json:
{
"expectation_suite_name": "dim_customers_suite",
"data_asset_type": "Dataset",
"expectations": [
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {
"min_value": 1000,
"max_value": 100000
}
},
{
"expectation_type": "expect_column_to_exist",
"kwargs": {
"column": "customer_id"
}
},
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "customer_id"
}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "customer_id"
}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "email"
}
},
{
"expectation_type": "expect_column_values_to_match_regex",
"kwargs": {
"column": "email",
"regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$"
}
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "segment",
"value_set": ["Premium", "Regular", "Occasional", "New"]
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "lifetime_value",
"min_value": 0,
"max_value": 100000
}
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "value_segment",
"value_set": ["Never Purchased", "Low Value", "Medium Value", "High Value", "VIP"]
}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "lifetime_value",
"min_value": 0,
"max_value": 5000
}
}
],
"meta": {
"great_expectations_version": "0.17.10"
}
}
Create great_expectations/expectations/fact_orders_suite.json:
{
"expectation_suite_name": "fact_orders_suite",
"data_asset_type": "Dataset",
"expectations": [
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {
"min_value": 10000,
"max_value": 1000000
}
},
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "order_id"
}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "customer_id"
}
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "status",
"value_set": ["completed", "pending", "cancelled", "returned"]
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "total",
"min_value": 0,
"max_value": 10000,
"mostly": 0.99
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "discount",
"min_value": 0,
"max_value": 1000
}
},
{
"expectation_type": "expect_column_pair_values_to_be_equal",
"kwargs": {
"column_A": "subtotal",
"column_B": "subtotal"
},
"meta": {
"note": "Subtotal should equal items total"
}
},
{
"expectation_type": "expect_column_values_to_be_dateutil_parseable",
"kwargs": {
"column": "order_date"
}
},
{
"expectation_type": "expect_column_max_to_be_between",
"kwargs": {
"column": "order_date",
"min_value": "2020-01-01",
"max_value": "2030-12-31",
"parse_strings_as_datetimes": true
}
}
],
"meta": {
"great_expectations_version": "0.17.10"
}
}
Create great_expectations/expectations/product_performance_suite.json:
{
"expectation_suite_name": "product_performance_suite",
"data_asset_type": "Dataset",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "product_id"
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "price",
"min_value": 0,
"max_value": 10000
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "profit_margin_pct",
"min_value": 0,
"max_value": 100
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "product_health_score",
"min_value": 0,
"max_value": 100
}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "category"
}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "profit_margin_pct",
"min_value": 20,
"max_value": 80
}
}
]
}
Create great_expectations/checkpoints/data_quality_checkpoint.yml:
name: data_quality_checkpoint
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-data-quality"
validations:
- batch_request:
datasource_name: ecommerce_databricks
data_connector_name: default_runtime_data_connector
data_asset_name: silver.dim_customers
expectation_suite_name: dim_customers_suite
- batch_request:
datasource_name: ecommerce_databricks
data_connector_name: default_runtime_data_connector
data_asset_name: silver.fact_orders
expectation_suite_name: fact_orders_suite
- batch_request:
datasource_name: ecommerce_databricks
data_connector_name: default_runtime_data_connector
data_asset_name: gold.product_performance
expectation_suite_name: product_performance_suite
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
✅ CHECKPOINT
Expand dbt testing with custom quality checks.
Create dbt/tests/generic/test_revenue_consistency.sql:
{% test revenue_consistency(model, column_name) %}
with validation as (
select
{{ column_name }},
subtotal,
discount,
shipping_cost,
tax,
total,
abs(
(subtotal - discount + shipping_cost + tax) - total
) as calculation_diff
from {{ model }}
where {{ column_name }} is not null
)
select *
from validation
where calculation_diff > 0.01 -- Allow 1 cent rounding
{% endtest %}
Create dbt/tests/generic/test_date_logic.sql:
{% test date_logic(model, start_date_column, end_date_column) %}
select *
from {{ model }}
where {{ start_date_column }} > {{ end_date_column }}
{% endtest %}
Create dbt/tests/generic/test_no_orphan_records.sql:
{% test no_orphan_records(model, column_name, parent_model, parent_column) %}
with child_keys as (
select distinct {{ column_name }} as key_value
from {{ model }}
where {{ column_name }} is not null
),
parent_keys as (
select distinct {{ parent_column }} as key_value
from {{ ref(parent_model) }}
)
select c.key_value
from child_keys c
left join parent_keys p on c.key_value = p.key_value
where p.key_value is null
{% endtest %}
Create dbt/tests/generic/test_metric_within_bounds.sql:
{% test metric_within_bounds(model, column_name, min_value, max_value, error_tolerance=0.1) %}
with validation as (
select
{{ column_name }},
count(*) as total_records,
sum(
case
when {{ column_name }} < {{ min_value }}
or {{ column_name }} > {{ max_value }}
then 1
else 0
end
) as out_of_bounds_count
from {{ model }}
where {{ column_name }} is not null
)
select *
from validation
where cast(out_of_bounds_count as float) / total_records > {{ error_tolerance }}
{% endtest %}
Update dbt/models/gold/schema.yml:
version: 2
models:
- name: customer_metrics
columns:
- name: customer_id
tests:
- unique
- not_null
- name: lifetime_value
tests:
- metric_within_bounds:
min_value: 0
max_value: 50000
error_tolerance: 0.05
- name: customer_quality_score
tests:
- metric_within_bounds:
min_value: 0
max_value: 100
- name: fact_orders
tests:
- revenue_consistency:
column_name: order_id
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- no_orphan_records:
parent_model: dim_customers
parent_column: customer_id
- name: product_performance
columns:
- name: profit_margin_pct
tests:
- metric_within_bounds:
min_value: 0
max_value: 100
cd dbt
dbt test
Expected output: All tests should pass.
✅ CHECKPOINT
Build a dashboard to monitor data quality metrics.
Create dbt/models/gold/metrics/data_quality_metrics.sql:
{{
config(
materialized='table',
schema='gold'
)
}}
with bronze_quality as (
select
'Bronze' as layer,
'customers' as table_name,
count(*) as row_count,
count(distinct customer_id) as unique_keys,
sum(case when customer_id is null then 1 else 0 end) as null_keys,
sum(case when email is null or email = '' then 1 else 0 end) as null_critical_fields,
round(
sum(case when customer_id is null then 1 else 0 end) * 100.0 / count(*),
2
) as null_key_pct
from {{ source('bronze', 'customers') }}
union all
select
'Bronze' as layer,
'products' as table_name,
count(*) as row_count,
count(distinct product_id) as unique_keys,
sum(case when product_id is null then 1 else 0 end) as null_keys,
sum(case when price is null or price < 0 then 1 else 0 end) as null_critical_fields,
round(
sum(case when product_id is null then 1 else 0 end) * 100.0 / count(*),
2
) as null_key_pct
from {{ source('bronze', 'products') }}
union all
select
'Bronze' as layer,
'orders' as table_name,
count(*) as row_count,
count(distinct order_id) as unique_keys,
sum(case when order_id is null then 1 else 0 end) as null_keys,
sum(case when customer_id is null or total is null then 1 else 0 end) as null_critical_fields,
round(
sum(case when order_id is null then 1 else 0 end) * 100.0 / count(*),
2
) as null_key_pct
from {{ source('bronze', 'orders') }}
),
silver_quality as (
select
'Silver' as layer,
'dim_customers' as table_name,
count(*) as row_count,
count(distinct customer_id) as unique_keys,
sum(case when customer_id is null then 1 else 0 end) as null_keys,
sum(case when lifetime_value < 0 then 1 else 0 end) as null_critical_fields,
round(
sum(case when customer_id is null then 1 else 0 end) * 100.0 / count(*),
2
) as null_key_pct
from {{ ref('dim_customers') }}
union all
select
'Silver' as layer,
'fact_orders' as table_name,
count(*) as row_count,
count(distinct order_id) as unique_keys,
sum(case when order_id is null then 1 else 0 end) as null_keys,
sum(case when total < 0 then 1 else 0 end) as null_critical_fields,
round(
sum(case when order_id is null then 1 else 0 end) * 100.0 / count(*),
2
) as null_key_pct
from {{ ref('fact_orders') }}
),
gold_quality as (
select
'Gold' as layer,
'customer_metrics' as table_name,
count(*) as row_count,
count(distinct customer_id) as unique_keys,
sum(case when customer_id is null then 1 else 0 end) as null_keys,
sum(case when customer_quality_score < 0 or customer_quality_score > 100 then 1 else 0 end) as null_critical_fields,
round(
sum(case when customer_id is null then 1 else 0 end) * 100.0 / count(*),
2
) as null_key_pct
from {{ ref('customer_metrics') }}
union all
select
'Gold' as layer,
'product_performance' as table_name,
count(*) as row_count,
count(distinct product_id) as unique_keys,
sum(case when product_id is null then 1 else 0 end) as null_keys,
sum(case when product_health_score < 0 or product_health_score > 100 then 1 else 0 end) as null_critical_fields,
round(
sum(case when product_id is null then 1 else 0 end) * 100.0 / count(*),
2
) as null_key_pct
from {{ ref('product_performance') }}
),
all_quality as (
select * from bronze_quality
union all
select * from silver_quality
union all
select * from gold_quality
)
select
layer,
table_name,
row_count,
unique_keys,
null_keys,
null_critical_fields,
null_key_pct,
-- Quality score (0-100)
least(100,
case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end +
case when row_count = unique_keys then 30 else 0 end +
case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end
) as quality_score,
-- Quality grade
case
when least(100,
case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end +
case when row_count = unique_keys then 30 else 0 end +
case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end
) >= 90 then 'A'
when least(100,
case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end +
case when row_count = unique_keys then 30 else 0 end +
case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end
) >= 80 then 'B'
when least(100,
case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end +
case when row_count = unique_keys then 30 else 0 end +
case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end
) >= 70 then 'C'
else 'D'
end as quality_grade,
current_timestamp() as metric_updated_at
from all_quality
Create dbt/models/gold/metrics/data_anomalies.sql:
{{
config(
materialized='table',
schema='gold'
)
}}
with daily_stats as (
select
order_date,
count(*) as order_count,
sum(total) as revenue,
avg(total) as avg_order_value
from {{ ref('fact_orders') }}
where status = 'completed'
group by order_date
),
stats_with_moving_avg as (
select
order_date,
order_count,
revenue,
avg_order_value,
avg(order_count) over (
order by order_date rows between 7 preceding and 1 preceding
) as avg_orders_7d,
stddev(order_count) over (
order by order_date rows between 7 preceding and 1 preceding
) as stddev_orders_7d,
avg(revenue) over (
order by order_date rows between 7 preceding and 1 preceding
) as avg_revenue_7d,
stddev(revenue) over (
order by order_date rows between 7 preceding and 1 preceding
) as stddev_revenue_7d
from daily_stats
),
anomalies as (
select
order_date,
order_count,
revenue,
avg_order_value,
-- Z-scores
case
when stddev_orders_7d > 0
then (order_count - avg_orders_7d) / stddev_orders_7d
else 0
end as orders_z_score,
case
when stddev_revenue_7d > 0
then (revenue - avg_revenue_7d) / stddev_revenue_7d
else 0
end as revenue_z_score,
-- Anomaly flags (z-score > 2 or < -2)
case
when stddev_orders_7d > 0
and abs((order_count - avg_orders_7d) / stddev_orders_7d) > 2
then true
else false
end as is_orders_anomaly,
case
when stddev_revenue_7d > 0
and abs((revenue - avg_revenue_7d) / stddev_revenue_7d) > 2
then true
else false
end as is_revenue_anomaly
from stats_with_moving_avg
where avg_orders_7d is not null
)
select
order_date,
order_count,
revenue,
avg_order_value,
round(orders_z_score, 2) as orders_z_score,
round(revenue_z_score, 2) as revenue_z_score,
is_orders_anomaly,
is_revenue_anomaly,
case
when is_orders_anomaly or is_revenue_anomaly then 'Anomaly Detected'
else 'Normal'
end as status,
current_timestamp() as detected_at
from anomalies
order by order_date desc
dbt run --models data_quality_metrics data_anomalies
Create Databricks notebook data_quality_dashboard:
# Databricks notebook source
# MAGIC %md
# MAGIC # Data Quality Dashboard
# COMMAND ----------
import matplotlib.pyplot as plt
import seaborn as sns
# COMMAND ----------
# MAGIC %md
# MAGIC ## Overall Data Quality Scores
# COMMAND ----------
quality_df = spark.sql("""
SELECT
layer,
table_name,
row_count,
quality_score,
quality_grade,
null_key_pct,
null_critical_fields
FROM gold.data_quality_metrics
ORDER BY layer, quality_score DESC
""").toPandas()
display(quality_df)
# COMMAND ----------
# Quality score by layer
fig, ax = plt.subplots(figsize=(10, 6))
quality_df.groupby('layer')['quality_score'].mean().plot(kind='bar', ax=ax, color=['#1f77b4', '#ff7f0e', '#2ca02c'])
ax.set_title('Average Quality Score by Layer', fontsize=16)
ax.set_ylabel('Quality Score')
ax.set_xlabel('Layer')
ax.set_ylim([0, 100])
plt.tight_layout()
display(fig)
# COMMAND ----------
# Quality grade distribution
print("\n📊 Quality Grade Distribution:")
print(quality_df['quality_grade'].value_counts().sort_index())
# COMMAND ----------
# MAGIC %md
# MAGIC ## Data Anomalies
# COMMAND ----------
anomalies_df = spark.sql("""
SELECT
order_date,
order_count,
revenue,
orders_z_score,
revenue_z_score,
status
FROM gold.data_anomalies
WHERE order_date >= date_sub(current_date(), 90)
ORDER BY order_date DESC
""").toPandas()
# Show anomalies
anomaly_count = len(anomalies_df[anomalies_df['status'] == 'Anomaly Detected'])
print(f"🚨 Anomalies detected in last 90 days: {anomaly_count}")
if anomaly_count > 0:
print("\nRecent Anomalies:")
display(anomalies_df[anomalies_df['status'] == 'Anomaly Detected'].head(10))
# COMMAND ----------
# Plot z-scores
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
# Orders z-score
ax1.plot(anomalies_df['order_date'], anomalies_df['orders_z_score'], marker='o', linewidth=1)
ax1.axhline(y=2, color='r', linestyle='--', label='Upper threshold')
ax1.axhline(y=-2, color='r', linestyle='--', label='Lower threshold')
ax1.set_title('Order Count Anomalies (Z-Score)', fontsize=14)
ax1.set_ylabel('Z-Score')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Revenue z-score
ax2.plot(anomalies_df['order_date'], anomalies_df['revenue_z_score'], marker='o', linewidth=1, color='orange')
ax2.axhline(y=2, color='r', linestyle='--', label='Upper threshold')
ax2.axhline(y=-2, color='r', linestyle='--', label='Lower threshold')
ax2.set_title('Revenue Anomalies (Z-Score)', fontsize=14)
ax2.set_ylabel('Z-Score')
ax2.set_xlabel('Date')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
display(fig)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Test Results Summary
# COMMAND ----------
print("=" * 70)
print("DATA QUALITY TEST SUMMARY")
print("=" * 70)
# Show tables by quality grade
for grade in ['A', 'B', 'C', 'D']:
tables = quality_df[quality_df['quality_grade'] == grade]
if len(tables) > 0:
print(f"\n{grade} Grade ({len(tables)} tables):")
for _, row in tables.iterrows():
print(f" • {row['layer']}.{row['table_name']} - Score: {row['quality_score']}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Recommendations
# COMMAND ----------
print("\n📋 DATA QUALITY RECOMMENDATIONS:\n")
# Find tables with issues
issues = quality_df[quality_df['quality_score'] < 90]
if len(issues) == 0:
print("✅ All tables have excellent quality scores (90+)!")
else:
print(f"⚠️ {len(issues)} tables need attention:\n")
for _, row in issues.iterrows():
print(f"Table: {row['layer']}.{row['table_name']}")
print(f" Score: {row['quality_score']} (Grade: {row['quality_grade']})")
if row['null_key_pct'] > 0:
print(f" ⚠️ {row['null_key_pct']:.1f}% null primary keys")
if row['null_critical_fields'] > 0:
print(f" ⚠️ {row['null_critical_fields']} records with null critical fields")
print()
# COMMAND ----------
print("\n" + "=" * 70)
print("✅ DATA QUALITY DASHBOARD COMPLETE")
print("=" * 70)
✅ CHECKPOINT
Set up automated quality monitoring that runs on schedule.
Create scripts/run_quality_checks.py:
"""
Automated data quality checks runner
Executes Great Expectations validations and dbt tests
"""
import subprocess
import sys
from datetime import datetime
import json
import os
def run_dbt_tests():
"""Run all dbt tests"""
print("\n" + "=" * 70)
print("RUNNING DBT TESTS")
print("=" * 70)
try:
result = subprocess.run(
["dbt", "test"],
cwd="dbt",
capture_output=True,
text=True
)
print(result.stdout)
if result.returncode == 0:
print("✅ All dbt tests passed")
return True
else:
print("❌ Some dbt tests failed")
print(result.stderr)
return False
except Exception as e:
print(f"❌ Error running dbt tests: {str(e)}")
return False
def run_quality_models():
"""Run data quality metric models"""
print("\n" + "=" * 70)
print("RUNNING QUALITY METRIC MODELS")
print("=" * 70)
try:
result = subprocess.run(
["dbt", "run", "--models", "data_quality_metrics", "data_anomalies"],
cwd="dbt",
capture_output=True,
text=True
)
print(result.stdout)
if result.returncode == 0:
print("✅ Quality models updated successfully")
return True
else:
print("❌ Quality models failed")
print(result.stderr)
return False
except Exception as e:
print(f"❌ Error running quality models: {str(e)}")
return False
def generate_quality_report():
"""Generate quality report summary"""
print("\n" + "=" * 70)
print("QUALITY CHECK SUMMARY")
print("=" * 70)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
report = {
"timestamp": timestamp,
"dbt_tests": "Completed",
"quality_models": "Updated",
"status": "Success"
}
# Save report
report_path = f"reports/quality_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
os.makedirs("reports", exist_ok=True)
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
print(f"\n✅ Quality report saved to: {report_path}")
print(json.dumps(report, indent=2))
def main():
"""Run all quality checks"""
print("=" * 70)
print("AUTOMATED DATA QUALITY CHECKS")
print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
# Run checks
dbt_success = run_dbt_tests()
quality_success = run_quality_models()
# Generate report
if dbt_success and quality_success:
generate_quality_report()
print("\n✅ All quality checks completed successfully!")
return 0
else:
print("\n❌ Some quality checks failed!")
return 1
if __name__ == "__main__":
sys.exit(main())
chmod +x scripts/run_quality_checks.py
python scripts/run_quality_checks.py
Create scripts/quality_monitoring.sql:
-- Data Quality Monitoring Dashboard
-- Run this query to get overall quality status
-- Overall Quality Summary
SELECT
'Overall Quality Status' as metric,
CONCAT(
ROUND(AVG(quality_score), 1),
'% (',
COUNT(*),
' tables)'
) as value
FROM gold.data_quality_metrics
UNION ALL
-- Quality by Layer
SELECT
CONCAT('Average Score - ', layer) as metric,
CONCAT(ROUND(AVG(quality_score), 1), '%') as value
FROM gold.data_quality_metrics
GROUP BY layer
UNION ALL
-- Tables with issues
SELECT
'Tables Below 90%' as metric,
CAST(COUNT(*) as STRING) as value
FROM gold.data_quality_metrics
WHERE quality_score < 90
UNION ALL
-- Recent anomalies
SELECT
'Anomalies (Last 7 Days)' as metric,
CAST(COUNT(*) as STRING) as value
FROM gold.data_anomalies
WHERE order_date >= date_sub(current_date(), 7)
AND status = 'Anomaly Detected'
ORDER BY metric;
-- Detailed Quality Breakdown
SELECT
layer,
table_name,
quality_score,
quality_grade,
row_count,
null_key_pct,
null_critical_fields
FROM gold.data_quality_metrics
ORDER BY quality_score ASC, layer, table_name;
-- Recent Anomalies Detail
SELECT
order_date,
order_count,
revenue,
orders_z_score,
revenue_z_score,
status
FROM gold.data_anomalies
WHERE status = 'Anomaly Detected'
AND order_date >= date_sub(current_date(), 30)
ORDER BY order_date DESC;
✅ CHECKPOINT
docs/data_quality_guide.md:# Data Quality Framework
## Overview
This document describes the data quality framework implemented across the e-commerce analytics platform.
## Quality Layers
### 1. Schema Testing (dbt)
- **Uniqueness tests** on primary keys
- **Not null tests** on required fields
- **Referential integrity** tests between tables
- **Accepted values** tests for categorical fields
### 2. Custom Business Logic Tests (dbt)
- **Revenue consistency** - Validates order total calculations
- **Date logic** - Ensures dates are in correct order
- **No orphan records** - Validates foreign key relationships
- **Metric bounds** - Ensures metrics are within expected ranges
### 3. Data Validation (Great Expectations)
- **Statistical tests** on numeric columns
- **Pattern matching** for text fields (e.g., email format)
- **Distribution tests** for value ranges
- **Table row counts** within expected bounds
### 4. Quality Metrics
- **Quality Score** (0-100) based on:
- Null key percentage (40 points)
- Uniqueness of keys (30 points)
- Critical field completeness (30 points)
- **Quality Grade** (A/B/C/D) for quick assessment
### 5. Anomaly Detection
- **Z-score analysis** on daily metrics
- **Moving average comparison** (7-day window)
- **Threshold alerting** (±2 standard deviations)
## Quality Standards
### Bronze Layer
- **Minimum Quality Score:** 70
- **Requirements:**
- No null primary keys
- All raw data preserved
- Metadata columns present
### Silver Layer
- **Minimum Quality Score:** 85
- **Requirements:**
- No null primary keys
- Foreign keys valid
- Data types correct
- Business rules applied
### Gold Layer
- **Minimum Quality Score:** 90
- **Requirements:**
- All aggregations validated
- Metrics within expected ranges
- No calculation errors
- Complete audit trail
## Running Quality Checks
### Manual Execution
```bash
# Run all dbt tests
cd dbt && dbt test
# Run quality models
dbt run --models data_quality_metrics data_anomalies
# Run automated checks
python scripts/run_quality_checks.py

Quality checks should run:
Access the quality dashboard via:
data_quality_dashboardscripts/quality_monitoring.sqlQuality alerts are triggered when:
When quality issues are detected:
quality_score - Overall quality (0-100)quality_grade - Letter grade (A-D)null_key_pct - Percentage of null primary keysnull_critical_fields - Count of null important fieldsorders_z_score - Standard deviations from meanrevenue_z_score - Standard deviations from meanstatus - Normal or Anomaly Detected
2. **Create quality testing checklist:**
Create `docs/quality_checklist.md`:
```markdown
# Data Quality Testing Checklist
## Pre-Deployment Checklist
Before deploying any data changes, verify:
- [ ] All dbt tests passing (`dbt test`)
- [ ] Quality scores > 85 for all tables
- [ ] No new anomalies detected
- [ ] Schema changes documented
- [ ] Backward compatibility maintained
## New Model Checklist
When creating a new model:
- [ ] Primary key uniqueness test added
- [ ] Not null tests on required fields
- [ ] Foreign key relationship tests added
- [ ] Accepted values tests for categorical fields
- [ ] Custom business logic tests if applicable
- [ ] Model documented in schema.yml
- [ ] Quality metrics include new table
## Daily Monitoring Checklist
Each day, verify:
- [ ] Review quality dashboard
- [ ] Check for anomalies in last 24 hours
- [ ] Verify row counts are within expected range
- [ ] No critical test failures
- [ ] Quality grades all B or better
## Weekly Review Checklist
Each week:
- [ ] Review quality score trends
- [ ] Investigate any declining scores
- [ ] Update test thresholds if needed
- [ ] Review and close quality incidents
- [ ] Update documentation as needed
## Monthly Assessment
Each month:
- [ ] Full quality framework review
- [ ] Update quality standards
- [ ] Review test coverage
- [ ] Optimize slow-running tests
- [ ] Quality metrics analysis
✅ CHECKPOINT
# Check status
git status
# Add all quality files
git add great_expectations/
git add dbt/tests/
git add dbt/models/gold/metrics/data_quality_metrics.sql
git add dbt/models/gold/metrics/data_anomalies.sql
git add scripts/run_quality_checks.py
git add scripts/quality_monitoring.sql
git add docs/data_quality_guide.md
git add docs/quality_checklist.md
# Commit
git commit -m "Phase 6 complete: Data quality framework
- Installed and configured Great Expectations
- Created 3 expectation suites for key tables
- Built 4 custom dbt test types
- Created data quality metrics table (9 tables monitored)
- Implemented anomaly detection with z-score analysis
- Built quality dashboard with visualizations
- Created automated quality check script
- Documented quality framework and standards
- All quality checks passing"
# Push to GitHub
git push origin main
✅ CHECKPOINT
✅ Testing Framework
✅ Quality Monitoring
data_quality_metrics - Monitors 9 key tablesdata_anomalies - Z-score anomaly detection✅ Automation
✅ Dashboards
✅ Documentation
Test Coverage:
Quality Scores:
Anomaly Detection:
In Phase 7, you will:
Estimated Time: 6-8 hours over Days 17-19
Issue: Great Expectations tests fail to connect
Solution: Verify Databricks connection string, check cluster is running
Issue: dbt tests timing out
Solution: Increase cluster size or add indexes to tables
Issue: Quality scores lower than expected
Solution: Review null_key_pct and null_critical_fields, investigate source data
Issue: Too many false positive anomalies
Solution: Adjust z-score threshold from 2 to 3, increase moving average window
Issue: Tests passing but dashboard shows issues
Solution: Refresh quality models: dbt run --models data_quality_metrics
Phase 6 Manual Version 1.0
Last Updated: 2025-01-01