E-Commerce Analytics Platform

Phase 6: Data Quality & Testing

Duration: Days 15-16 | 4-6 hours total
Goal: Implement comprehensive data quality framework with automated testing


OVERVIEW

In Phase 6, you will:

Data Quality Philosophy: Trust but verify. Automated testing ensures data reliability and catches issues before they reach end users.


PREREQUISITES

Before starting Phase 6:


ARCHITECTURE: DATA QUALITY FRAMEWORK

Copy to clipboard
Data Sources Quality Checks → [Great Expectations] dbt Tests → [Schema + Custom Tests] Quality Dashboard → [Metrics + Alerts] Data Consumers

STEP 6.1: Install Great Expectations (30 minutes)

Actions:

  1. Install Great Expectations:
Copy to clipboard
# Activate virtual environment source venv/bin/activate # Install Great Expectations pip install great-expectations==0.17.10 # Verify installation great_expectations --version
  1. Initialize Great Expectations project:
Copy to clipboard
# Create directory mkdir -p great_expectations cd great_expectations # Initialize great_expectations init

When prompted:

  1. Configure Databricks connection:

Create great_expectations/great_expectations.yml:

Copy to clipboard
config_version: 3.0 datasources: ecommerce_databricks: class_name: Datasource execution_engine: class_name: SparkDFExecutionEngine spark_config: spark.master: local[*] spark.app.name: great_expectations data_connectors: default_runtime_data_connector: class_name: RuntimeDataConnector batch_identifiers: - default_identifier_name stores: expectations_store: class_name: ExpectationsStore store_backend: class_name: TupleFilesystemStoreBackend base_directory: expectations/ validations_store: class_name: ValidationsStore store_backend: class_name: TupleFilesystemStoreBackend base_directory: uncommitted/validations/ evaluation_parameter_store: class_name: EvaluationParameterStore checkpoint_store: class_name: CheckpointStore store_backend: class_name: TupleFilesystemStoreBackend base_directory: checkpoints/ expectations_store_name: expectations_store validations_store_name: validations_store evaluation_parameter_store_name: evaluation_parameter_store checkpoint_store_name: checkpoint_store data_docs_sites: local_site: class_name: SiteBuilder store_backend: class_name: TupleFilesystemStoreBackend base_directory: uncommitted/data_docs/local_site/ site_index_builder: class_name: DefaultSiteIndexBuilder
  1. Create Python script to connect to Databricks:

Create scripts/ge_databricks_connector.py:

Copy to clipboard
""" Great Expectations connector for Databricks """ import great_expectations as ge from great_expectations.core.batch import RuntimeBatchRequest from pyspark.sql import SparkSession import os from dotenv import load_dotenv load_dotenv() # Initialize Spark session for Databricks def get_spark_session(): """Create Spark session configured for Databricks""" return SparkSession.builder \ .appName("GreatExpectations") \ .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .getOrCreate() def get_ge_context(): """Get Great Expectations context""" context = ge.get_context() return context def validate_table(table_name, expectation_suite_name): """ Validate a Databricks table using Great Expectations Args: table_name: Full table name (e.g., 'silver.dim_customers') expectation_suite_name: Name of expectation suite to use """ context = get_ge_context() spark = get_spark_session() # Read table df = spark.table(table_name) # Create batch request batch_request = RuntimeBatchRequest( datasource_name="ecommerce_databricks", data_connector_name="default_runtime_data_connector", data_asset_name=table_name, runtime_parameters={"batch_data": df}, batch_identifiers={"default_identifier_name": table_name} ) # Run validation results = context.run_checkpoint( checkpoint_name=f"{expectation_suite_name}_checkpoint", validations=[ { "batch_request": batch_request, "expectation_suite_name": expectation_suite_name } ] ) return results if __name__ == "__main__": print("Great Expectations Databricks Connector initialized")

✅ CHECKPOINT


STEP 6.2: Create Expectation Suites (1.5 hours)

Create data quality expectations for key tables.

Actions:

  1. Create expectation suite for dim_customers:

Create great_expectations/expectations/dim_customers_suite.json:

Copy to clipboard
{ "expectation_suite_name": "dim_customers_suite", "data_asset_type": "Dataset", "expectations": [ { "expectation_type": "expect_table_row_count_to_be_between", "kwargs": { "min_value": 1000, "max_value": 100000 } }, { "expectation_type": "expect_column_to_exist", "kwargs": { "column": "customer_id" } }, { "expectation_type": "expect_column_values_to_be_unique", "kwargs": { "column": "customer_id" } }, { "expectation_type": "expect_column_values_to_not_be_null", "kwargs": { "column": "customer_id" } }, { "expectation_type": "expect_column_values_to_not_be_null", "kwargs": { "column": "email" } }, { "expectation_type": "expect_column_values_to_match_regex", "kwargs": { "column": "email", "regex": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$" } }, { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { "column": "segment", "value_set": ["Premium", "Regular", "Occasional", "New"] } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "lifetime_value", "min_value": 0, "max_value": 100000 } }, { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { "column": "value_segment", "value_set": ["Never Purchased", "Low Value", "Medium Value", "High Value", "VIP"] } }, { "expectation_type": "expect_column_mean_to_be_between", "kwargs": { "column": "lifetime_value", "min_value": 0, "max_value": 5000 } } ], "meta": { "great_expectations_version": "0.17.10" } }
  1. Create expectation suite for fact_orders:

Create great_expectations/expectations/fact_orders_suite.json:

Copy to clipboard
{ "expectation_suite_name": "fact_orders_suite", "data_asset_type": "Dataset", "expectations": [ { "expectation_type": "expect_table_row_count_to_be_between", "kwargs": { "min_value": 10000, "max_value": 1000000 } }, { "expectation_type": "expect_column_values_to_be_unique", "kwargs": { "column": "order_id" } }, { "expectation_type": "expect_column_values_to_not_be_null", "kwargs": { "column": "customer_id" } }, { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { "column": "status", "value_set": ["completed", "pending", "cancelled", "returned"] } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "total", "min_value": 0, "max_value": 10000, "mostly": 0.99 } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "discount", "min_value": 0, "max_value": 1000 } }, { "expectation_type": "expect_column_pair_values_to_be_equal", "kwargs": { "column_A": "subtotal", "column_B": "subtotal" }, "meta": { "note": "Subtotal should equal items total" } }, { "expectation_type": "expect_column_values_to_be_dateutil_parseable", "kwargs": { "column": "order_date" } }, { "expectation_type": "expect_column_max_to_be_between", "kwargs": { "column": "order_date", "min_value": "2020-01-01", "max_value": "2030-12-31", "parse_strings_as_datetimes": true } } ], "meta": { "great_expectations_version": "0.17.10" } }
  1. Create expectation suite for product_performance:

Create great_expectations/expectations/product_performance_suite.json:

Copy to clipboard
{ "expectation_suite_name": "product_performance_suite", "data_asset_type": "Dataset", "expectations": [ { "expectation_type": "expect_column_values_to_be_unique", "kwargs": { "column": "product_id" } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "price", "min_value": 0, "max_value": 10000 } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "profit_margin_pct", "min_value": 0, "max_value": 100 } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "product_health_score", "min_value": 0, "max_value": 100 } }, { "expectation_type": "expect_column_values_to_not_be_null", "kwargs": { "column": "category" } }, { "expectation_type": "expect_column_mean_to_be_between", "kwargs": { "column": "profit_margin_pct", "min_value": 20, "max_value": 80 } } ] }
  1. Create checkpoint configuration:

Create great_expectations/checkpoints/data_quality_checkpoint.yml:

Copy to clipboard
name: data_quality_checkpoint config_version: 1.0 class_name: SimpleCheckpoint run_name_template: "%Y%m%d-%H%M%S-data-quality" validations: - batch_request: datasource_name: ecommerce_databricks data_connector_name: default_runtime_data_connector data_asset_name: silver.dim_customers expectation_suite_name: dim_customers_suite - batch_request: datasource_name: ecommerce_databricks data_connector_name: default_runtime_data_connector data_asset_name: silver.fact_orders expectation_suite_name: fact_orders_suite - batch_request: datasource_name: ecommerce_databricks data_connector_name: default_runtime_data_connector data_asset_name: gold.product_performance expectation_suite_name: product_performance_suite action_list: - name: store_validation_result action: class_name: StoreValidationResultAction - name: update_data_docs action: class_name: UpdateDataDocsAction

✅ CHECKPOINT


STEP 6.3: Create dbt Data Quality Tests (1 hour)

Expand dbt testing with custom quality checks.

Actions:

  1. Create custom dbt test for revenue consistency:

Create dbt/tests/generic/test_revenue_consistency.sql:

Copy to clipboard
{% test revenue_consistency(model, column_name) %} with validation as ( select {{ column_name }}, subtotal, discount, shipping_cost, tax, total, abs( (subtotal - discount + shipping_cost + tax) - total ) as calculation_diff from {{ model }} where {{ column_name }} is not null ) select * from validation where calculation_diff > 0.01 -- Allow 1 cent rounding {% endtest %}
  1. Create test for date logic:

Create dbt/tests/generic/test_date_logic.sql:

Copy to clipboard
{% test date_logic(model, start_date_column, end_date_column) %} select * from {{ model }} where {{ start_date_column }} > {{ end_date_column }} {% endtest %}
  1. Create test for referential orphans:

Create dbt/tests/generic/test_no_orphan_records.sql:

Copy to clipboard
{% test no_orphan_records(model, column_name, parent_model, parent_column) %} with child_keys as ( select distinct {{ column_name }} as key_value from {{ model }} where {{ column_name }} is not null ), parent_keys as ( select distinct {{ parent_column }} as key_value from {{ ref(parent_model) }} ) select c.key_value from child_keys c left join parent_keys p on c.key_value = p.key_value where p.key_value is null {% endtest %}
  1. Create test for metric bounds:

Create dbt/tests/generic/test_metric_within_bounds.sql:

Copy to clipboard
{% test metric_within_bounds(model, column_name, min_value, max_value, error_tolerance=0.1) %} with validation as ( select {{ column_name }}, count(*) as total_records, sum( case when {{ column_name }} < {{ min_value }} or {{ column_name }} > {{ max_value }} then 1 else 0 end ) as out_of_bounds_count from {{ model }} where {{ column_name }} is not null ) select * from validation where cast(out_of_bounds_count as float) / total_records > {{ error_tolerance }} {% endtest %}
  1. Update model schema with new tests:

Update dbt/models/gold/schema.yml:

Copy to clipboard
version: 2 models: - name: customer_metrics columns: - name: customer_id tests: - unique - not_null - name: lifetime_value tests: - metric_within_bounds: min_value: 0 max_value: 50000 error_tolerance: 0.05 - name: customer_quality_score tests: - metric_within_bounds: min_value: 0 max_value: 100 - name: fact_orders tests: - revenue_consistency: column_name: order_id columns: - name: order_id tests: - unique - not_null - name: customer_id tests: - no_orphan_records: parent_model: dim_customers parent_column: customer_id - name: product_performance columns: - name: profit_margin_pct tests: - metric_within_bounds: min_value: 0 max_value: 100
  1. Run all dbt tests:
Copy to clipboard
cd dbt dbt test

Expected output: All tests should pass.

✅ CHECKPOINT


STEP 6.4: Create Data Quality Dashboard (1 hour)

Build a dashboard to monitor data quality metrics.

Actions:

  1. Create data quality metrics table in dbt:

Create dbt/models/gold/metrics/data_quality_metrics.sql:

Copy to clipboard
{{ config( materialized='table', schema='gold' ) }} with bronze_quality as ( select 'Bronze' as layer, 'customers' as table_name, count(*) as row_count, count(distinct customer_id) as unique_keys, sum(case when customer_id is null then 1 else 0 end) as null_keys, sum(case when email is null or email = '' then 1 else 0 end) as null_critical_fields, round( sum(case when customer_id is null then 1 else 0 end) * 100.0 / count(*), 2 ) as null_key_pct from {{ source('bronze', 'customers') }} union all select 'Bronze' as layer, 'products' as table_name, count(*) as row_count, count(distinct product_id) as unique_keys, sum(case when product_id is null then 1 else 0 end) as null_keys, sum(case when price is null or price < 0 then 1 else 0 end) as null_critical_fields, round( sum(case when product_id is null then 1 else 0 end) * 100.0 / count(*), 2 ) as null_key_pct from {{ source('bronze', 'products') }} union all select 'Bronze' as layer, 'orders' as table_name, count(*) as row_count, count(distinct order_id) as unique_keys, sum(case when order_id is null then 1 else 0 end) as null_keys, sum(case when customer_id is null or total is null then 1 else 0 end) as null_critical_fields, round( sum(case when order_id is null then 1 else 0 end) * 100.0 / count(*), 2 ) as null_key_pct from {{ source('bronze', 'orders') }} ), silver_quality as ( select 'Silver' as layer, 'dim_customers' as table_name, count(*) as row_count, count(distinct customer_id) as unique_keys, sum(case when customer_id is null then 1 else 0 end) as null_keys, sum(case when lifetime_value < 0 then 1 else 0 end) as null_critical_fields, round( sum(case when customer_id is null then 1 else 0 end) * 100.0 / count(*), 2 ) as null_key_pct from {{ ref('dim_customers') }} union all select 'Silver' as layer, 'fact_orders' as table_name, count(*) as row_count, count(distinct order_id) as unique_keys, sum(case when order_id is null then 1 else 0 end) as null_keys, sum(case when total < 0 then 1 else 0 end) as null_critical_fields, round( sum(case when order_id is null then 1 else 0 end) * 100.0 / count(*), 2 ) as null_key_pct from {{ ref('fact_orders') }} ), gold_quality as ( select 'Gold' as layer, 'customer_metrics' as table_name, count(*) as row_count, count(distinct customer_id) as unique_keys, sum(case when customer_id is null then 1 else 0 end) as null_keys, sum(case when customer_quality_score < 0 or customer_quality_score > 100 then 1 else 0 end) as null_critical_fields, round( sum(case when customer_id is null then 1 else 0 end) * 100.0 / count(*), 2 ) as null_key_pct from {{ ref('customer_metrics') }} union all select 'Gold' as layer, 'product_performance' as table_name, count(*) as row_count, count(distinct product_id) as unique_keys, sum(case when product_id is null then 1 else 0 end) as null_keys, sum(case when product_health_score < 0 or product_health_score > 100 then 1 else 0 end) as null_critical_fields, round( sum(case when product_id is null then 1 else 0 end) * 100.0 / count(*), 2 ) as null_key_pct from {{ ref('product_performance') }} ), all_quality as ( select * from bronze_quality union all select * from silver_quality union all select * from gold_quality ) select layer, table_name, row_count, unique_keys, null_keys, null_critical_fields, null_key_pct, -- Quality score (0-100) least(100, case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end + case when row_count = unique_keys then 30 else 0 end + case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end ) as quality_score, -- Quality grade case when least(100, case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end + case when row_count = unique_keys then 30 else 0 end + case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end ) >= 90 then 'A' when least(100, case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end + case when row_count = unique_keys then 30 else 0 end + case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end ) >= 80 then 'B' when least(100, case when null_key_pct = 0 then 40 else 40 - (null_key_pct * 10) end + case when row_count = unique_keys then 30 else 0 end + case when null_critical_fields = 0 then 30 else 30 - least(30, null_critical_fields) end ) >= 70 then 'C' else 'D' end as quality_grade, current_timestamp() as metric_updated_at from all_quality
  1. Create anomaly detection model:

Create dbt/models/gold/metrics/data_anomalies.sql:

Copy to clipboard
{{ config( materialized='table', schema='gold' ) }} with daily_stats as ( select order_date, count(*) as order_count, sum(total) as revenue, avg(total) as avg_order_value from {{ ref('fact_orders') }} where status = 'completed' group by order_date ), stats_with_moving_avg as ( select order_date, order_count, revenue, avg_order_value, avg(order_count) over ( order by order_date rows between 7 preceding and 1 preceding ) as avg_orders_7d, stddev(order_count) over ( order by order_date rows between 7 preceding and 1 preceding ) as stddev_orders_7d, avg(revenue) over ( order by order_date rows between 7 preceding and 1 preceding ) as avg_revenue_7d, stddev(revenue) over ( order by order_date rows between 7 preceding and 1 preceding ) as stddev_revenue_7d from daily_stats ), anomalies as ( select order_date, order_count, revenue, avg_order_value, -- Z-scores case when stddev_orders_7d > 0 then (order_count - avg_orders_7d) / stddev_orders_7d else 0 end as orders_z_score, case when stddev_revenue_7d > 0 then (revenue - avg_revenue_7d) / stddev_revenue_7d else 0 end as revenue_z_score, -- Anomaly flags (z-score > 2 or < -2) case when stddev_orders_7d > 0 and abs((order_count - avg_orders_7d) / stddev_orders_7d) > 2 then true else false end as is_orders_anomaly, case when stddev_revenue_7d > 0 and abs((revenue - avg_revenue_7d) / stddev_revenue_7d) > 2 then true else false end as is_revenue_anomaly from stats_with_moving_avg where avg_orders_7d is not null ) select order_date, order_count, revenue, avg_order_value, round(orders_z_score, 2) as orders_z_score, round(revenue_z_score, 2) as revenue_z_score, is_orders_anomaly, is_revenue_anomaly, case when is_orders_anomaly or is_revenue_anomaly then 'Anomaly Detected' else 'Normal' end as status, current_timestamp() as detected_at from anomalies order by order_date desc
  1. Run quality models:
Copy to clipboard
dbt run --models data_quality_metrics data_anomalies
  1. Create validation notebook:

Create Databricks notebook data_quality_dashboard:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Data Quality Dashboard # COMMAND ---------- import matplotlib.pyplot as plt import seaborn as sns # COMMAND ---------- # MAGIC %md # MAGIC ## Overall Data Quality Scores # COMMAND ---------- quality_df = spark.sql(""" SELECT layer, table_name, row_count, quality_score, quality_grade, null_key_pct, null_critical_fields FROM gold.data_quality_metrics ORDER BY layer, quality_score DESC """).toPandas() display(quality_df) # COMMAND ---------- # Quality score by layer fig, ax = plt.subplots(figsize=(10, 6)) quality_df.groupby('layer')['quality_score'].mean().plot(kind='bar', ax=ax, color=['#1f77b4', '#ff7f0e', '#2ca02c']) ax.set_title('Average Quality Score by Layer', fontsize=16) ax.set_ylabel('Quality Score') ax.set_xlabel('Layer') ax.set_ylim([0, 100]) plt.tight_layout() display(fig) # COMMAND ---------- # Quality grade distribution print("\n📊 Quality Grade Distribution:") print(quality_df['quality_grade'].value_counts().sort_index()) # COMMAND ---------- # MAGIC %md # MAGIC ## Data Anomalies # COMMAND ---------- anomalies_df = spark.sql(""" SELECT order_date, order_count, revenue, orders_z_score, revenue_z_score, status FROM gold.data_anomalies WHERE order_date >= date_sub(current_date(), 90) ORDER BY order_date DESC """).toPandas() # Show anomalies anomaly_count = len(anomalies_df[anomalies_df['status'] == 'Anomaly Detected']) print(f"🚨 Anomalies detected in last 90 days: {anomaly_count}") if anomaly_count > 0: print("\nRecent Anomalies:") display(anomalies_df[anomalies_df['status'] == 'Anomaly Detected'].head(10)) # COMMAND ---------- # Plot z-scores fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10)) # Orders z-score ax1.plot(anomalies_df['order_date'], anomalies_df['orders_z_score'], marker='o', linewidth=1) ax1.axhline(y=2, color='r', linestyle='--', label='Upper threshold') ax1.axhline(y=-2, color='r', linestyle='--', label='Lower threshold') ax1.set_title('Order Count Anomalies (Z-Score)', fontsize=14) ax1.set_ylabel('Z-Score') ax1.legend() ax1.grid(True, alpha=0.3) # Revenue z-score ax2.plot(anomalies_df['order_date'], anomalies_df['revenue_z_score'], marker='o', linewidth=1, color='orange') ax2.axhline(y=2, color='r', linestyle='--', label='Upper threshold') ax2.axhline(y=-2, color='r', linestyle='--', label='Lower threshold') ax2.set_title('Revenue Anomalies (Z-Score)', fontsize=14) ax2.set_ylabel('Z-Score') ax2.set_xlabel('Date') ax2.legend() ax2.grid(True, alpha=0.3) plt.tight_layout() display(fig) # COMMAND ---------- # MAGIC %md # MAGIC ## Test Results Summary # COMMAND ---------- print("=" * 70) print("DATA QUALITY TEST SUMMARY") print("=" * 70) # Show tables by quality grade for grade in ['A', 'B', 'C', 'D']: tables = quality_df[quality_df['quality_grade'] == grade] if len(tables) > 0: print(f"\n{grade} Grade ({len(tables)} tables):") for _, row in tables.iterrows(): print(f"{row['layer']}.{row['table_name']} - Score: {row['quality_score']}") # COMMAND ---------- # MAGIC %md # MAGIC ## Recommendations # COMMAND ---------- print("\n📋 DATA QUALITY RECOMMENDATIONS:\n") # Find tables with issues issues = quality_df[quality_df['quality_score'] < 90] if len(issues) == 0: print("✅ All tables have excellent quality scores (90+)!") else: print(f"⚠️ {len(issues)} tables need attention:\n") for _, row in issues.iterrows(): print(f"Table: {row['layer']}.{row['table_name']}") print(f" Score: {row['quality_score']} (Grade: {row['quality_grade']})") if row['null_key_pct'] > 0: print(f" ⚠️ {row['null_key_pct']:.1f}% null primary keys") if row['null_critical_fields'] > 0: print(f" ⚠️ {row['null_critical_fields']} records with null critical fields") print() # COMMAND ---------- print("\n" + "=" * 70) print("✅ DATA QUALITY DASHBOARD COMPLETE") print("=" * 70)
  1. Run the quality dashboard notebook

✅ CHECKPOINT


STEP 6.5: Create Automated Quality Checks (45 minutes)

Set up automated quality monitoring that runs on schedule.

Actions:

  1. Create automated quality check script:

Create scripts/run_quality_checks.py:

Copy to clipboard
""" Automated data quality checks runner Executes Great Expectations validations and dbt tests """ import subprocess import sys from datetime import datetime import json import os def run_dbt_tests(): """Run all dbt tests""" print("\n" + "=" * 70) print("RUNNING DBT TESTS") print("=" * 70) try: result = subprocess.run( ["dbt", "test"], cwd="dbt", capture_output=True, text=True ) print(result.stdout) if result.returncode == 0: print("✅ All dbt tests passed") return True else: print("❌ Some dbt tests failed") print(result.stderr) return False except Exception as e: print(f"❌ Error running dbt tests: {str(e)}") return False def run_quality_models(): """Run data quality metric models""" print("\n" + "=" * 70) print("RUNNING QUALITY METRIC MODELS") print("=" * 70) try: result = subprocess.run( ["dbt", "run", "--models", "data_quality_metrics", "data_anomalies"], cwd="dbt", capture_output=True, text=True ) print(result.stdout) if result.returncode == 0: print("✅ Quality models updated successfully") return True else: print("❌ Quality models failed") print(result.stderr) return False except Exception as e: print(f"❌ Error running quality models: {str(e)}") return False def generate_quality_report(): """Generate quality report summary""" print("\n" + "=" * 70) print("QUALITY CHECK SUMMARY") print("=" * 70) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") report = { "timestamp": timestamp, "dbt_tests": "Completed", "quality_models": "Updated", "status": "Success" } # Save report report_path = f"reports/quality_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" os.makedirs("reports", exist_ok=True) with open(report_path, 'w') as f: json.dump(report, f, indent=2) print(f"\n✅ Quality report saved to: {report_path}") print(json.dumps(report, indent=2)) def main(): """Run all quality checks""" print("=" * 70) print("AUTOMATED DATA QUALITY CHECKS") print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 70) # Run checks dbt_success = run_dbt_tests() quality_success = run_quality_models() # Generate report if dbt_success and quality_success: generate_quality_report() print("\n✅ All quality checks completed successfully!") return 0 else: print("\n❌ Some quality checks failed!") return 1 if __name__ == "__main__": sys.exit(main())
  1. Make script executable and test:
Copy to clipboard
chmod +x scripts/run_quality_checks.py python scripts/run_quality_checks.py
  1. Create quality monitoring dashboard SQL:

Create scripts/quality_monitoring.sql:

Copy to clipboard
-- Data Quality Monitoring Dashboard -- Run this query to get overall quality status -- Overall Quality Summary SELECT 'Overall Quality Status' as metric, CONCAT( ROUND(AVG(quality_score), 1), '% (', COUNT(*), ' tables)' ) as value FROM gold.data_quality_metrics UNION ALL -- Quality by Layer SELECT CONCAT('Average Score - ', layer) as metric, CONCAT(ROUND(AVG(quality_score), 1), '%') as value FROM gold.data_quality_metrics GROUP BY layer UNION ALL -- Tables with issues SELECT 'Tables Below 90%' as metric, CAST(COUNT(*) as STRING) as value FROM gold.data_quality_metrics WHERE quality_score < 90 UNION ALL -- Recent anomalies SELECT 'Anomalies (Last 7 Days)' as metric, CAST(COUNT(*) as STRING) as value FROM gold.data_anomalies WHERE order_date >= date_sub(current_date(), 7) AND status = 'Anomaly Detected' ORDER BY metric; -- Detailed Quality Breakdown SELECT layer, table_name, quality_score, quality_grade, row_count, null_key_pct, null_critical_fields FROM gold.data_quality_metrics ORDER BY quality_score ASC, layer, table_name; -- Recent Anomalies Detail SELECT order_date, order_count, revenue, orders_z_score, revenue_z_score, status FROM gold.data_anomalies WHERE status = 'Anomaly Detected' AND order_date >= date_sub(current_date(), 30) ORDER BY order_date DESC;

✅ CHECKPOINT


STEP 6.6: Document Data Quality Framework (30 minutes)

Actions:

  1. Create docs/data_quality_guide.md:
Copy to clipboard
# Data Quality Framework ## Overview This document describes the data quality framework implemented across the e-commerce analytics platform. ## Quality Layers ### 1. Schema Testing (dbt) - **Uniqueness tests** on primary keys - **Not null tests** on required fields - **Referential integrity** tests between tables - **Accepted values** tests for categorical fields ### 2. Custom Business Logic Tests (dbt) - **Revenue consistency** - Validates order total calculations - **Date logic** - Ensures dates are in correct order - **No orphan records** - Validates foreign key relationships - **Metric bounds** - Ensures metrics are within expected ranges ### 3. Data Validation (Great Expectations) - **Statistical tests** on numeric columns - **Pattern matching** for text fields (e.g., email format) - **Distribution tests** for value ranges - **Table row counts** within expected bounds ### 4. Quality Metrics - **Quality Score** (0-100) based on: - Null key percentage (40 points) - Uniqueness of keys (30 points) - Critical field completeness (30 points) - **Quality Grade** (A/B/C/D) for quick assessment ### 5. Anomaly Detection - **Z-score analysis** on daily metrics - **Moving average comparison** (7-day window) - **Threshold alerting** (±2 standard deviations) ## Quality Standards ### Bronze Layer - **Minimum Quality Score:** 70 - **Requirements:** - No null primary keys - All raw data preserved - Metadata columns present ### Silver Layer - **Minimum Quality Score:** 85 - **Requirements:** - No null primary keys - Foreign keys valid - Data types correct - Business rules applied ### Gold Layer - **Minimum Quality Score:** 90 - **Requirements:** - All aggregations validated - Metrics within expected ranges - No calculation errors - Complete audit trail ## Running Quality Checks ### Manual Execution ```bash # Run all dbt tests cd dbt && dbt test # Run quality models dbt run --models data_quality_metrics data_anomalies # Run automated checks python scripts/run_quality_checks.py

Scheduled Execution

Quality checks should run:

Quality Dashboard

Access the quality dashboard via:

Alerting

Quality alerts are triggered when:

Remediation Process

When quality issues are detected:

  1. Identify - Review quality dashboard
  2. Investigate - Check failed tests and anomalies
  3. Fix - Correct data or update rules
  4. Validate - Re-run quality checks
  5. Document - Update quality log

Quality Metrics Reference

Table: data_quality_metrics

Table: data_anomalies

Best Practices

  1. Test Early - Add tests when creating models
  2. Document Assumptions - Explain why tests exist
  3. Monitor Trends - Track quality scores over time
  4. Investigate Anomalies - Don't ignore outliers
  5. Automate Everything - Manual checks are missed checks
Copy to clipboard
2. **Create quality testing checklist:** Create `docs/quality_checklist.md`: ```markdown # Data Quality Testing Checklist ## Pre-Deployment Checklist Before deploying any data changes, verify: - [ ] All dbt tests passing (`dbt test`) - [ ] Quality scores > 85 for all tables - [ ] No new anomalies detected - [ ] Schema changes documented - [ ] Backward compatibility maintained ## New Model Checklist When creating a new model: - [ ] Primary key uniqueness test added - [ ] Not null tests on required fields - [ ] Foreign key relationship tests added - [ ] Accepted values tests for categorical fields - [ ] Custom business logic tests if applicable - [ ] Model documented in schema.yml - [ ] Quality metrics include new table ## Daily Monitoring Checklist Each day, verify: - [ ] Review quality dashboard - [ ] Check for anomalies in last 24 hours - [ ] Verify row counts are within expected range - [ ] No critical test failures - [ ] Quality grades all B or better ## Weekly Review Checklist Each week: - [ ] Review quality score trends - [ ] Investigate any declining scores - [ ] Update test thresholds if needed - [ ] Review and close quality incidents - [ ] Update documentation as needed ## Monthly Assessment Each month: - [ ] Full quality framework review - [ ] Update quality standards - [ ] Review test coverage - [ ] Optimize slow-running tests - [ ] Quality metrics analysis

✅ CHECKPOINT


STEP 6.7: Commit Phase 6 to Git (15 minutes)

Actions:

Copy to clipboard
# Check status git status # Add all quality files git add great_expectations/ git add dbt/tests/ git add dbt/models/gold/metrics/data_quality_metrics.sql git add dbt/models/gold/metrics/data_anomalies.sql git add scripts/run_quality_checks.py git add scripts/quality_monitoring.sql git add docs/data_quality_guide.md git add docs/quality_checklist.md # Commit git commit -m "Phase 6 complete: Data quality framework - Installed and configured Great Expectations - Created 3 expectation suites for key tables - Built 4 custom dbt test types - Created data quality metrics table (9 tables monitored) - Implemented anomaly detection with z-score analysis - Built quality dashboard with visualizations - Created automated quality check script - Documented quality framework and standards - All quality checks passing" # Push to GitHub git push origin main

✅ CHECKPOINT


PHASE 6 COMPLETE! 🎉

What You Built:

✅ Testing Framework

✅ Quality Monitoring

✅ Automation

✅ Dashboards

✅ Documentation


Quality Metrics Summary

Test Coverage:

Quality Scores:

Anomaly Detection:


What's Next: Phase 7

In Phase 7, you will:

Estimated Time: 6-8 hours over Days 17-19


Troubleshooting

Issue: Great Expectations tests fail to connect
Solution: Verify Databricks connection string, check cluster is running

Issue: dbt tests timing out
Solution: Increase cluster size or add indexes to tables

Issue: Quality scores lower than expected
Solution: Review null_key_pct and null_critical_fields, investigate source data

Issue: Too many false positive anomalies
Solution: Adjust z-score threshold from 2 to 3, increase moving average window

Issue: Tests passing but dashboard shows issues
Solution: Refresh quality models: dbt run --models data_quality_metrics


Best Practices Learned

  1. Test at every layer - Bronze, Silver, and Gold all need validation
  2. Automate everything - Manual testing doesn't scale
  3. Monitor trends - One-time checks miss slow degradation
  4. Set clear standards - Define what "good quality" means
  5. Document failures - Track issues for pattern recognition
  6. Balance coverage vs. speed - Not every column needs every test
  7. Alert thoughtfully - Too many alerts = ignored alerts

Resources


Phase 6 Manual Version 1.0
Last Updated: 2025-01-01