E-Commerce Analytics Platform

Phase 8: Orchestration & Monitoring

Duration: Days 20-22 | 6-8 hours total
Goal: Automate workflows, implement monitoring, and create operational dashboards


OVERVIEW

In Phase 8, you will:

Orchestration Philosophy: Automate everything, monitor everything, alert intelligently.


PREREQUISITES

Before starting Phase 8:


ARCHITECTURE: ORCHESTRATION PIPELINE

Copy to clipboard
Scheduler (Databricks Jobs) Data Ingestion (Bronze) dbt Transformations (Silver) Aggregations (Gold) ML Predictions Quality Checks → [Monitor] → [Alerts] Operational Dashboard

STEP 8.1: Create Databricks Jobs (1.5 hours)

Set up automated job execution.

Actions:

  1. Create job configuration files:

Create databricks/jobs/bronze_ingestion_job.json:

Copy to clipboard
{ "name": "Bronze Layer Ingestion", "tags": { "environment": "production", "layer": "bronze" }, "tasks": [ { "task_key": "mount_storage", "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/01_setup/mount_storage", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "ingest_customers", "depends_on": [ { "task_key": "mount_storage" } ], "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_customers", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "ingest_products", "depends_on": [ { "task_key": "mount_storage" } ], "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_products", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "ingest_orders", "depends_on": [ { "task_key": "mount_storage" } ], "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_orders", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "ingest_web_events", "depends_on": [ { "task_key": "mount_storage" } ], "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_web_events", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 } ], "schedule": { "quartz_cron_expression": "0 0 2 * * ?", "timezone_id": "America/New_York", "pause_status": "UNPAUSED" }, "email_notifications": { "on_failure": ["your_email@domain.com"] }, "timeout_seconds": 14400, "max_concurrent_runs": 1 }
  1. Create dbt transformation job:

Create databricks/jobs/silver_transformation_job.json:

Copy to clipboard
{ "name": "Silver Layer Transformations (dbt)", "tags": { "environment": "production", "layer": "silver" }, "tasks": [ { "task_key": "dbt_run_staging", "python_wheel_task": { "package_name": "dbt", "entry_point": "run", "parameters": ["--models", "staging"] }, "existing_cluster_id": "YOUR_CLUSTER_ID", "libraries": [ { "pypi": { "package": "dbt-databricks==1.6.2" } } ], "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "dbt_run_silver", "depends_on": [ { "task_key": "dbt_run_staging" } ], "python_wheel_task": { "package_name": "dbt", "entry_point": "run", "parameters": ["--models", "silver"] }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "dbt_test", "depends_on": [ { "task_key": "dbt_run_silver" } ], "python_wheel_task": { "package_name": "dbt", "entry_point": "test" }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 1800, "max_retries": 1 } ], "schedule": { "quartz_cron_expression": "0 30 2 * * ?", "timezone_id": "America/New_York", "pause_status": "UNPAUSED" }, "email_notifications": { "on_failure": ["your_email@domain.com"] }, "timeout_seconds": 7200, "max_concurrent_runs": 1 }
  1. Create Gold layer job:

Create databricks/jobs/gold_aggregation_job.json:

Copy to clipboard
{ "name": "Gold Layer Aggregations", "tags": { "environment": "production", "layer": "gold" }, "tasks": [ { "task_key": "dbt_run_gold", "python_wheel_task": { "package_name": "dbt", "entry_point": "run", "parameters": ["--models", "gold"] }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "run_ml_predictions", "depends_on": [ { "task_key": "dbt_run_gold" } ], "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/06_ml_models/customer_segmentation", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 }, { "task_key": "run_churn_prediction", "depends_on": [ { "task_key": "dbt_run_gold" } ], "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/06_ml_models/churn_prediction", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID", "timeout_seconds": 3600, "max_retries": 2 } ], "schedule": { "quartz_cron_expression": "0 0 3 * * ?", "timezone_id": "America/New_York", "pause_status": "UNPAUSED" }, "email_notifications": { "on_failure": ["your_email@domain.com"] }, "timeout_seconds": 7200, "max_concurrent_runs": 1 }
  1. Create master orchestration job:

Create databricks/jobs/master_pipeline_job.json:

Copy to clipboard
{ "name": "Master Data Pipeline", "tags": { "environment": "production", "type": "orchestration" }, "tasks": [ { "task_key": "bronze_ingestion", "run_job_task": { "job_id": "BRONZE_JOB_ID" } }, { "task_key": "silver_transformation", "depends_on": [ { "task_key": "bronze_ingestion" } ], "run_job_task": { "job_id": "SILVER_JOB_ID" } }, { "task_key": "gold_aggregation", "depends_on": [ { "task_key": "silver_transformation" } ], "run_job_task": { "job_id": "GOLD_JOB_ID" } }, { "task_key": "quality_checks", "depends_on": [ { "task_key": "gold_aggregation" } ], "notebook_task": { "notebook_path": "/Workspace/Users/your_email@domain.com/05_data_quality/data_quality_dashboard", "base_parameters": {} }, "existing_cluster_id": "YOUR_CLUSTER_ID" } ], "schedule": { "quartz_cron_expression": "0 0 2 * * ?", "timezone_id": "America/New_York", "pause_status": "UNPAUSED" }, "email_notifications": { "on_success": ["your_email@domain.com"], "on_failure": ["your_email@domain.com"] }, "timeout_seconds": 14400, "max_concurrent_runs": 1 }
  1. Create job creation script:

Create scripts/create_databricks_jobs.py:

Copy to clipboard
""" Create Databricks Jobs from JSON configurations """ import json import os from databricks_cli.sdk import ApiClient from databricks_cli.jobs.api import JobsApi # Initialize Databricks client api_client = ApiClient( host=os.getenv('DATABRICKS_HOST'), token=os.getenv('DATABRICKS_TOKEN') ) jobs_api = JobsApi(api_client) def create_job_from_file(filepath): """Create Databricks job from JSON config file""" with open(filepath, 'r') as f: job_config = json.load(f) # Create job response = jobs_api.create_job(job_config) job_id = response['job_id'] print(f"✅ Created job: {job_config['name']}") print(f" Job ID: {job_id}") return job_id def main(): """Create all jobs""" job_configs = [ 'databricks/jobs/bronze_ingestion_job.json', 'databricks/jobs/silver_transformation_job.json', 'databricks/jobs/gold_aggregation_job.json' ] print("=" * 70) print("CREATING DATABRICKS JOBS") print("=" * 70) job_ids = {} for config_file in job_configs: if os.path.exists(config_file): job_id = create_job_from_file(config_file) job_name = config_file.split('/')[-1].replace('_job.json', '') job_ids[job_name] = job_id else: print(f"⚠️ Config not found: {config_file}") print("\n" + "=" * 70) print("✅ JOB CREATION COMPLETE") print("=" * 70) print("\nJob IDs:") for name, job_id in job_ids.items(): print(f" {name}: {job_id}") if __name__ == "__main__": main()

Note: You'll need to manually create jobs in Databricks UI first to get job IDs, or use the script above.

✅ CHECKPOINT


STEP 8.2: Create Monitoring Dashboard (2 hours)

Build operational monitoring dashboard.

Actions:

  1. Create monitoring notebook:

Create Databricks notebook operational_monitoring:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Operational Monitoring Dashboard # MAGIC # MAGIC Real-time monitoring of pipeline health and performance # COMMAND ---------- from datetime import datetime, timedelta import pandas as pd import matplotlib.pyplot as plt import seaborn as sns # COMMAND ---------- # MAGIC %md # MAGIC ## Pipeline Status Overview # COMMAND ---------- print("=" * 70) print("OPERATIONAL HEALTH CHECK") print(f"Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 70) # COMMAND ---------- # MAGIC %md # MAGIC ## Table Freshness # COMMAND ---------- # Check when tables were last updated table_freshness = spark.sql(""" SELECT 'bronze.customers' as table_name, MAX(ingestion_timestamp) as last_updated, COUNT(*) as row_count FROM bronze.customers UNION ALL SELECT 'silver.dim_customers', MAX(dbt_updated_at), COUNT(*) FROM silver.dim_customers UNION ALL SELECT 'gold.customer_metrics', MAX(metric_updated_at), COUNT(*) FROM gold.customer_metrics UNION ALL SELECT 'gold.daily_revenue', MAX(metric_updated_at), COUNT(*) FROM gold.daily_revenue """).toPandas() # Calculate freshness in hours table_freshness['hours_since_update'] = ( pd.Timestamp.now() - pd.to_datetime(table_freshness['last_updated']) ).dt.total_seconds() / 3600 # Add status table_freshness['status'] = table_freshness['hours_since_update'].apply( lambda x: '✅ Fresh' if x < 24 else ('⚠️ Stale' if x < 48 else '❌ Very Stale') ) print("\n📊 TABLE FRESHNESS:") display(table_freshness[['table_name', 'last_updated', 'row_count', 'hours_since_update', 'status']]) # COMMAND ---------- # MAGIC %md # MAGIC ## Data Quality Scores # COMMAND ---------- quality_summary = spark.sql(""" SELECT layer, COUNT(*) as table_count, ROUND(AVG(quality_score), 1) as avg_quality_score, MIN(quality_score) as min_quality_score, SUM(CASE WHEN quality_score < 85 THEN 1 ELSE 0 END) as tables_below_threshold FROM gold.data_quality_metrics GROUP BY layer ORDER BY layer """).toPandas() print("\n📈 QUALITY SUMMARY BY LAYER:") display(quality_summary) # Alert if any layer below threshold for _, row in quality_summary.iterrows(): if row['min_quality_score'] < 85: print(f"⚠️ ALERT: {row['layer']} layer has quality score below 85!") # COMMAND ---------- # MAGIC %md # MAGIC ## Recent Anomalies # COMMAND ---------- recent_anomalies = spark.sql(""" SELECT order_date, order_count, revenue, orders_z_score, revenue_z_score, status FROM gold.data_anomalies WHERE order_date >= date_sub(current_date(), 7) AND status = 'Anomaly Detected' ORDER BY order_date DESC """).toPandas() anomaly_count = len(recent_anomalies) print(f"\n🚨 ANOMALIES (Last 7 Days): {anomaly_count}") if anomaly_count > 0: print("⚠️ ALERT: Anomalies detected!") display(recent_anomalies) else: print("✅ No anomalies detected") # COMMAND ---------- # MAGIC %md # MAGIC ## Model Performance # COMMAND ---------- # Churn model performance churn_dist = spark.sql(""" SELECT churn_risk, COUNT(*) as customer_count, ROUND(AVG(churn_probability), 3) as avg_probability FROM gold.customer_churn_predictions GROUP BY churn_risk ORDER BY CASE churn_risk WHEN 'High' THEN 1 WHEN 'Medium' THEN 2 WHEN 'Low' THEN 3 END """).toPandas() print("\n🎯 CHURN PREDICTION DISTRIBUTION:") display(churn_dist) high_risk_count = churn_dist[churn_dist['churn_risk'] == 'High']['customer_count'].values[0] if high_risk_count > 500: print(f"⚠️ ALERT: {high_risk_count} high-risk customers!") # COMMAND ---------- # MAGIC %md # MAGIC ## System Metrics # COMMAND ---------- # Storage usage by layer storage_usage = spark.sql(""" SELECT 'Bronze' as layer, COUNT(*) as table_count FROM bronze.customers UNION ALL SELECT 'Silver', COUNT(*) FROM silver.dim_customers UNION ALL SELECT 'Gold', COUNT(*) FROM gold.customer_metrics """).toPandas() print("\n💾 RECORD COUNTS BY LAYER:") display(storage_usage) # COMMAND ---------- # MAGIC %md # MAGIC ## Revenue Trends # COMMAND ---------- # Last 30 days revenue revenue_trend = spark.sql(""" SELECT order_date, gross_revenue, total_orders, revenue_7d_avg FROM gold.daily_revenue WHERE order_date >= date_sub(current_date(), 30) ORDER BY order_date """).toPandas() # Plot fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10)) # Revenue plot ax1.plot(revenue_trend['order_date'], revenue_trend['gross_revenue'], label='Daily Revenue', marker='o') ax1.plot(revenue_trend['order_date'], revenue_trend['revenue_7d_avg'], label='7-Day Average', linestyle='--') ax1.set_title('Revenue Trend (Last 30 Days)', fontsize=14) ax1.set_ylabel('Revenue ($)') ax1.legend() ax1.grid(True, alpha=0.3) # Orders plot ax2.plot(revenue_trend['order_date'], revenue_trend['total_orders'], marker='o', color='orange') ax2.set_title('Order Volume (Last 30 Days)', fontsize=14) ax2.set_ylabel('Orders') ax2.set_xlabel('Date') ax2.grid(True, alpha=0.3) plt.tight_layout() display(fig) # COMMAND ---------- # MAGIC %md # MAGIC ## Health Check Summary # COMMAND ---------- # Calculate overall health score health_checks = { 'Data Freshness': 'PASS' if all(table_freshness['hours_since_update'] < 24) else 'FAIL', 'Quality Scores': 'PASS' if all(quality_summary['min_quality_score'] >= 85) else 'FAIL', 'Anomaly Detection': 'PASS' if anomaly_count == 0 else 'WARNING', 'Model Predictions': 'PASS' if high_risk_count < 500 else 'WARNING', 'Data Volume': 'PASS' # Assuming volumes are normal } print("\n" + "=" * 70) print("HEALTH CHECK SUMMARY") print("=" * 70) for check, status in health_checks.items(): emoji = '✅' if status == 'PASS' else ('⚠️' if status == 'WARNING' else '❌') print(f"{emoji} {check}: {status}") # Overall status if all(v == 'PASS' for v in health_checks.values()): print("\n✅ ALL SYSTEMS OPERATIONAL") elif any(v == 'FAIL' for v in health_checks.values()): print("\n❌ SYSTEM ISSUES DETECTED - IMMEDIATE ACTION REQUIRED") else: print("\n⚠️ SYSTEM WARNINGS - MONITOR CLOSELY") # COMMAND ---------- # MAGIC %md # MAGIC ## Alert Summary # COMMAND ---------- alerts = [] # Freshness alerts stale_tables = table_freshness[table_freshness['hours_since_update'] > 24] for _, row in stale_tables.iterrows(): alerts.append({ 'severity': 'HIGH' if row['hours_since_update'] > 48 else 'MEDIUM', 'type': 'Freshness', 'message': f"{row['table_name']} is {row['hours_since_update']:.1f} hours old" }) # Quality alerts low_quality = quality_summary[quality_summary['tables_below_threshold'] > 0] for _, row in low_quality.iterrows(): alerts.append({ 'severity': 'HIGH', 'type': 'Quality', 'message': f"{row['layer']} layer has {row['tables_below_threshold']} tables below quality threshold" }) # Anomaly alerts if anomaly_count > 0: alerts.append({ 'severity': 'MEDIUM', 'type': 'Anomaly', 'message': f"{anomaly_count} anomalies detected in last 7 days" }) # High-risk customer alerts if high_risk_count > 500: alerts.append({ 'severity': 'MEDIUM', 'type': 'Business', 'message': f"{high_risk_count} customers at high churn risk" }) if alerts: print("\n🚨 ACTIVE ALERTS:") alerts_df = pd.DataFrame(alerts) display(alerts_df) else: print("\n✅ No active alerts") # COMMAND ---------- print("\n" + "=" * 70) print("✅ OPERATIONAL MONITORING COMPLETE") print("=" * 70) print(f"Next Check: {(datetime.now() + timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')}")
  1. Run monitoring notebook

✅ CHECKPOINT


STEP 8.3: Create Alerting System (1 hour)

Implement email alerting for critical issues.

Actions:

  1. Create alerting notebook:

Create Databricks notebook alerting_system:

Copy to clipboard
# Databricks notebook source # MAGIC %md # MAGIC # Alerting System # MAGIC # MAGIC Checks for issues and sends email alerts # COMMAND ---------- import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from datetime import datetime import pandas as pd # COMMAND ---------- # Email configuration (use environment variables in production) SMTP_SERVER = "smtp.gmail.com" SMTP_PORT = 587 SENDER_EMAIL = "your_email@gmail.com" SENDER_PASSWORD = dbutils.secrets.get(scope="email", key="password") RECEIVER_EMAIL = "your_email@gmail.com" # COMMAND ---------- def send_email(subject, body_html): """Send email alert""" try: msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = SENDER_EMAIL msg['To'] = RECEIVER_EMAIL html_part = MIMEText(body_html, 'html') msg.attach(html_part) with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() server.login(SENDER_EMAIL, SENDER_PASSWORD) server.send_message(msg) print(f"✅ Alert email sent: {subject}") return True except Exception as e: print(f"❌ Failed to send email: {str(e)}") return False # COMMAND ---------- # MAGIC %md # MAGIC ## Check for Issues # COMMAND ---------- issues = [] # Check 1: Stale data stale_tables = spark.sql(""" SELECT 'bronze.customers' as table_name, MAX(ingestion_timestamp) as last_updated FROM bronze.customers HAVING TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) > 24 """).collect() if stale_tables: issues.append({ 'severity': 'HIGH', 'type': 'Stale Data', 'details': f"Tables not updated in 24+ hours: {len(stale_tables)}" }) # Check 2: Quality issues low_quality = spark.sql(""" SELECT COUNT(*) as count FROM gold.data_quality_metrics WHERE quality_score < 85 """).collect()[0][0] if low_quality > 0: issues.append({ 'severity': 'HIGH', 'type': 'Data Quality', 'details': f"{low_quality} tables below quality threshold" }) # Check 3: Anomalies anomalies = spark.sql(""" SELECT COUNT(*) as count FROM gold.data_anomalies WHERE order_date >= date_sub(current_date(), 1) AND status = 'Anomaly Detected' """).collect()[0][0] if anomalies > 0: issues.append({ 'severity': 'MEDIUM', 'type': 'Anomaly', 'details': f"{anomalies} anomalies in last 24 hours" }) print(f"Issues found: {len(issues)}") # COMMAND ---------- # MAGIC %md # MAGIC ## Send Alerts # COMMAND ---------- if issues: # Create HTML email body html = f""" <html> <head></head> <body> <h2>🚨 E-Commerce Analytics Platform - Alert</h2> <p>Alert Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> <h3>Issues Detected:</h3> <table border="1" cellpadding="5" cellspacing="0"> <tr> <th>Severity</th> <th>Type</th> <th>Details</th> </tr> """ for issue in issues: color = 'red' if issue['severity'] == 'HIGH' else 'orange' html += f""" <tr> <td style="color: {color}"><strong>{issue['severity']}</strong></td> <td>{issue['type']}</td> <td>{issue['details']}</td> </tr> """ html += """ </table> <h3>Actions Required:</h3> <ul> <li>Review operational monitoring dashboard</li> <li>Check Databricks job logs</li> <li>Investigate data quality issues</li> </ul> <p> <a href="https://your-databricks-workspace.azuredatabricks.net"> View Databricks Workspace </a> </p> </body> </html> """ # Send alert subject = f"🚨 Alert: {len(issues)} Issues Detected - E-Commerce Platform" send_email(subject, html) else: print("✅ No issues detected - no alert sent") # COMMAND ---------- print("=" * 70) print("✅ ALERTING SYSTEM CHECK COMPLETE") print("=" * 70)
  1. Set up email secrets in Databricks:
    • Go to Databricks UI → Settings → Secrets
    • Create scope: email
    • Add secret: password with your email app password

✅ CHECKPOINT


STEP 8.4: Create Operational Dashboard SQL (30 minutes)

Create reusable SQL queries for monitoring.

Actions:

  1. Create scripts/operational_queries.sql:
Copy to clipboard
-- ============================================================================ -- OPERATIONAL MONITORING QUERIES -- Quick health checks for the e-commerce analytics platform -- ============================================================================ -- QUERY 1: Pipeline Status Overview -- Shows overall health of the data pipeline SELECT 'Pipeline Health' as metric, CASE WHEN stale_count = 0 AND low_quality_count = 0 THEN '✅ Healthy' WHEN stale_count > 0 OR low_quality_count > 2 THEN '❌ Critical' ELSE '⚠️ Warning' END as status, CONCAT(stale_count, ' stale tables, ', low_quality_count, ' low quality') as details FROM ( SELECT COUNT(DISTINCT CASE WHEN TIMESTAMPDIFF(HOUR, ingestion_timestamp, CURRENT_TIMESTAMP()) > 24 THEN 'stale' END) as stale_count, (SELECT COUNT(*) FROM gold.data_quality_metrics WHERE quality_score < 85) as low_quality_count FROM bronze.customers ); -- QUERY 2: Table Freshness Report -- Last update time for each critical table SELECT 'Bronze - Customers' as table_name, MAX(ingestion_timestamp) as last_updated, TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) as hours_old, COUNT(*) as row_count, CASE WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 24 THEN '✅' WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 48 THEN '⚠️' ELSE '❌' END as status FROM bronze.customers UNION ALL SELECT 'Bronze - Orders', MAX(ingestion_timestamp), TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()), COUNT(*), CASE WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 24 THEN '✅' WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 48 THEN '⚠️' ELSE '❌' END FROM bronze.orders UNION ALL SELECT 'Silver - Customers', MAX(dbt_updated_at), TIMESTAMPDIFF(HOUR, MAX(dbt_updated_at), CURRENT_TIMESTAMP()), COUNT(*), CASE WHEN TIMESTAMPDIFF(HOUR, MAX(dbt_updated_at), CURRENT_TIMESTAMP()) < 24 THEN '✅' WHEN TIMESTAMPDIFF(HOUR, MAX(dbt_updated_at), CURRENT_TIMESTAMP()) < 48 THEN '⚠️' ELSE '❌' END FROM silver.dim_customers UNION ALL SELECT 'Gold - Metrics', MAX(metric_updated_at), TIMESTAMPDIFF(HOUR, MAX(metric_updated_at), CURRENT_TIMESTAMP()), COUNT(*), CASE WHEN TIMESTAMPDIFF(HOUR, MAX(metric_updated_at), CURRENT_TIMESTAMP()) < 24 THEN '✅' WHEN TIMESTAMPDIFF(HOUR, MAX(metric_updated_at), CURRENT_TIMESTAMP()) < 48 THEN '⚠️' ELSE '❌' END FROM gold.customer_metrics ORDER BY table_name; -- QUERY 3: Data Quality Summary -- Current quality scores by layer SELECT layer, COUNT(*) as table_count, ROUND(AVG(quality_score), 1) as avg_score, MIN(quality_score) as min_score, MAX(quality_score) as max_score, SUM(CASE WHEN quality_score < 85 THEN 1 ELSE 0 END) as tables_below_threshold, CASE WHEN MIN(quality_score) >= 90 THEN '✅ Excellent' WHEN MIN(quality_score) >= 85 THEN '✅ Good' WHEN MIN(quality_score) >= 70 THEN '⚠️ Fair' ELSE '❌ Poor' END as status FROM gold.data_quality_metrics GROUP BY layer ORDER BY layer; -- QUERY 4: Recent Anomalies -- Anomalies detected in last 7 days SELECT order_date, order_count, ROUND(revenue, 2) as revenue, ROUND(orders_z_score, 2) as orders_zscore, ROUND(revenue_z_score, 2) as revenue_zscore, status FROM gold.data_anomalies WHERE order_date >= DATE_SUB(CURRENT_DATE(), 7) AND status = 'Anomaly Detected' ORDER BY order_date DESC; -- QUERY 5: ML Model Health -- Current state of ML predictions SELECT 'Customer Segmentation' as model, COUNT(DISTINCT customer_id) as predictions, COUNT(DISTINCT segment_name) as segments, '✅ Active' as status FROM gold.customer_segments UNION ALL SELECT 'Churn Prediction', COUNT(DISTINCT customer_id), COUNT(DISTINCT churn_risk), CASE WHEN SUM(CASE WHEN churn_risk = 'High' THEN 1 ELSE 0 END) > 1000 THEN '⚠️ High Risk' ELSE '✅ Active' END FROM gold.customer_churn_predictions UNION ALL SELECT 'Product Recommendations', COUNT(DISTINCT product_id), COUNT(DISTINCT recommended_product_id), '✅ Active' FROM gold.product_recommendations; -- QUERY 6: System Performance Metrics -- Row counts and growth by layer SELECT 'Bronze Layer' as layer, SUM(row_count) as total_rows, 'Raw ingestion' as description FROM ( SELECT COUNT(*) as row_count FROM bronze.customers UNION ALL SELECT COUNT(*) FROM bronze.products UNION ALL SELECT COUNT(*) FROM bronze.orders UNION ALL SELECT COUNT(*) FROM bronze.order_items UNION ALL SELECT COUNT(*) FROM bronze.web_events ) bronze_tables UNION ALL SELECT 'Silver Layer', SUM(row_count), 'Cleaned & validated' FROM ( SELECT COUNT(*) FROM silver.dim_customers UNION ALL SELECT COUNT(*) FROM silver.dim_products UNION ALL SELECT COUNT(*) FROM silver.fact_orders UNION ALL SELECT COUNT(*) FROM silver.fact_order_items UNION ALL SELECT COUNT(*) FROM silver.fact_web_events ) silver_tables UNION ALL SELECT 'Gold Layer', SUM(row_count), 'Business aggregations' FROM ( SELECT COUNT(*) FROM gold.customer_metrics UNION ALL SELECT COUNT(*) FROM gold.product_performance UNION ALL SELECT COUNT(*) FROM gold.daily_revenue ) gold_tables; -- QUERY 7: Critical Alerts -- Issues requiring immediate attention SELECT 'Stale Tables' as alert_type, 'HIGH' as severity, COUNT(*) as count, GROUP_CONCAT(table_name) as details FROM ( SELECT 'bronze.customers' as table_name FROM bronze.customers GROUP BY 1 HAVING TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) > 48 ) stale UNION ALL SELECT 'Low Quality Tables', 'HIGH', COUNT(*), GROUP_CONCAT(CONCAT(layer, '.', table_name)) FROM gold.data_quality_metrics WHERE quality_score < 70 UNION ALL SELECT 'Recent Anomalies', 'MEDIUM', COUNT(*), CONCAT(COUNT(*), ' anomalies detected') FROM gold.data_anomalies WHERE order_date >= DATE_SUB(CURRENT_DATE(), 1) AND status = 'Anomaly Detected' UNION ALL SELECT 'High Churn Risk', 'MEDIUM', COUNT(*), CONCAT(COUNT(*), ' customers at high risk') FROM gold.customer_churn_predictions WHERE churn_risk = 'High'; -- QUERY 8: Daily Pipeline Summary -- Summary for daily standup/reports SELECT CURRENT_DATE() as report_date, (SELECT COUNT(*) FROM bronze.orders WHERE DATE(ingestion_timestamp) = CURRENT_DATE()) as orders_ingested_today, (SELECT COUNT(*) FROM silver.fact_orders WHERE DATE(dbt_updated_at) = CURRENT_DATE()) as orders_processed_today, (SELECT ROUND(AVG(quality_score), 1) FROM gold.data_quality_metrics) as avg_quality_score, (SELECT COUNT(*) FROM gold.data_anomalies WHERE order_date >= DATE_SUB(CURRENT_DATE(), 1) AND status = 'Anomaly Detected') as anomalies_24h, CASE WHEN (SELECT COUNT(*) FROM gold.data_quality_metrics WHERE quality_score < 85) = 0 THEN '✅ All checks passed' ELSE '⚠️ Issues detected' END as overall_status;
  1. Test queries in Databricks SQL Editor

✅ CHECKPOINT


STEP 8.5: Create Workflow Documentation (30 minutes)

Document the orchestration setup.

Actions:

  1. Create docs/orchestration_guide.md:
Copy to clipboard
# Orchestration & Monitoring Guide ## Overview This guide describes the automated workflows, monitoring, and alerting for the e-commerce analytics platform. ## Job Schedule ### Daily Pipeline (2:00 AM EST) 1. **Bronze Ingestion** (2:00 AM - 2:30 AM) - Mounts Azure storage - Ingests all source data - Validates schema - Writes to Delta tables 2. **Silver Transformation** (2:30 AM - 3:00 AM) - Runs dbt staging models - Runs dbt Silver models - Executes data quality tests - Updates dimensional tables 3. **Gold Aggregation** (3:00 AM - 3:45 AM) - Runs dbt Gold models - Generates ML predictions (segmentation, churn) - Updates business metrics - Creates executive summaries 4. **Quality Checks** (3:45 AM - 4:00 AM) - Runs data quality validations - Updates quality metrics - Checks for anomalies - Triggers alerts if needed ### Weekly Jobs - **Product Recommendations** (Sunday 3:00 AM) - Rebuilds collaborative filtering model - Generates fresh recommendations - **Model Retraining** (Sunday 4:00 AM) - Retrains customer segmentation - Validates model performance - Updates MLflow registry ## Monitoring ### Automated Checks - **Table Freshness:** Every hour - **Data Quality:** After each pipeline run - **Anomaly Detection:** Daily - **Model Performance:** Weekly ### Dashboards - **Operational Dashboard:** Real-time health status - **Data Quality Dashboard:** Detailed quality metrics - **ML Model Dashboard:** Model performance tracking ### Alert Thresholds | Alert Type | Threshold | Severity | Action | |------------|-----------|----------|--------| | Stale Data | > 24 hours | HIGH | Immediate investigation | | Quality Score | < 85 | HIGH | Review and fix | | Quality Score | < 70 | CRITICAL | Stop pipeline | | Anomaly Detection | Any detected | MEDIUM | Investigate cause | | High Churn Risk | > 500 customers | MEDIUM | Business review | | Job Failure | Any failure | HIGH | Check logs immediately | ## Email Notifications ### Recipients - Data Team: all alerts - Business Team: business metric alerts only - On-call: critical alerts only ### Alert Types - **Success:** Daily pipeline completion - **Warning:** Quality issues, anomalies - **Failure:** Job failures, critical issues ## Job Management ### Starting a Job Manually ```bash # Via Databricks CLI databricks jobs run-now --job-id <JOB_ID> # Via UI # Go to Workflows → Jobs → Select Job → Run Now

Viewing Job Logs

  1. Go to Databricks → Workflows → Jobs
  2. Click on job name
  3. Select run from history
  4. View logs for each task

Troubleshooting Failed Jobs

Step 1: Check Job Status

Step 2: Review Task Logs

Step 3: Common Issues

Step 4: Rerun Job

Performance Optimization

Cluster Settings

Job Optimization

Cost Management

Maintenance Windows

Weekly Maintenance (Sunday 1:00 AM - 2:00 AM)

Monthly Maintenance (First Sunday 12:00 AM - 2:00 AM)

Disaster Recovery

Backup Strategy

Recovery Procedures

Scenario 1: Corrupted Table

Copy to clipboard
-- Restore from Delta history RESTORE TABLE silver.dim_customers TO VERSION AS OF 24 HOURS AGO;

Scenario 2: Failed Pipeline

Scenario 3: Data Quality Issues

Contacts

Data Engineering Team

Business Intelligence Team

DevOps/Infrastructure

Copy to clipboard
2. **Create runbook:** Create `docs/runbook.md`: ```markdown # Operational Runbook ## Daily Operations ### Morning Check (9:00 AM) - [ ] Review overnight job status - [ ] Check operational dashboard - [ ] Verify no critical alerts - [ ] Review data quality scores - [ ] Check for anomalies ### End of Day (5:00 PM) - [ ] Verify all jobs completed - [ ] Review any incidents - [ ] Update ticket system - [ ] Brief team on any issues ## Weekly Tasks ### Monday - [ ] Review weekly summary - [ ] Check model performance - [ ] Analyze anomalies from previous week ### Friday - [ ] Prepare weekend coverage - [ ] Review upcoming changes - [ ] Update documentation ## Incident Response ### P1: Critical (Pipeline Down) 1. Acknowledge alert 2. Check job status in Databricks 3. Review error logs 4. Notify team in Slack 5. Fix issue or escalate 6. Document in incident log 7. Post-mortem within 24 hours ### P2: High (Data Quality Issues) 1. Check quality dashboard 2. Identify affected tables 3. Stop downstream jobs if needed 4. Investigate root cause 5. Fix and validate 6. Resume pipeline 7. Document resolution ### P3: Medium (Anomalies Detected) 1. Review anomaly details 2. Verify data accuracy 3. Investigate business context 4. Document findings 5. Adjust thresholds if false positive ## Emergency Contacts **Escalation Path:** 1. Data Engineer on-call 2. Data Engineering Manager 3. VP of Engineering **24/7 Support:** - PagerDuty: data-engineering - Slack: #data-engineering-alerts

✅ CHECKPOINT


STEP 8.6: Commit Phase 8 to Git (15 minutes)

Actions:

Copy to clipboard
# Check status git status # Add all orchestration files git add databricks/jobs/ git add scripts/operational_queries.sql git add docs/orchestration_guide.md git add docs/runbook.md # Commit git commit -m "Phase 8 complete: Orchestration & Monitoring - Created 4 Databricks job configurations - Built operational monitoring dashboard - Implemented alerting system with email notifications - Created reusable operational SQL queries - Documented complete orchestration workflow - Created runbook for daily operations - Scheduled daily pipeline execution (2 AM) - Set up health checks and quality monitoring - All jobs configured with error handling and retries" # Push to GitHub git push origin main

✅ CHECKPOINT


PHASE 8 COMPLETE! 🎉

What You Built:

✅ Automated Jobs (4 jobs)

✅ Monitoring System

✅ Alerting Infrastructure

✅ Operational Tools

✅ Documentation


Job Schedule Summary

Job Time (EST) Duration Frequency
Bronze Ingestion 2:00 AM 30 min Daily
Silver Transform 2:30 AM 30 min Daily
Gold Aggregation 3:00 AM 45 min Daily
Quality Checks 3:45 AM 15 min Daily
Product Recommendations 3:00 AM 30 min Weekly (Sun)
Model Retraining 4:00 AM 60 min Weekly (Sun)

Total Daily Runtime: ~90 minutes
Total Weekly Runtime: ~11 hours


Monitoring Coverage

Data Freshness:

Data Quality:

Anomaly Detection:

System Health:


What's Next: Phase 9

In Phase 9, you will:

Estimated Time: 4-6 hours over Days 23-24


Troubleshooting

Issue: Jobs not starting on schedule
Solution: Check job schedule settings, verify cluster is not terminated

Issue: Email alerts not sending
Solution: Verify SMTP settings, check secrets are configured correctly

Issue: Monitoring dashboard shows stale data
Solution: Check if jobs completed successfully, review job logs

Issue: Job running too long
Solution: Increase cluster size, optimize queries, check for data skew

Issue: Quality checks failing
Solution: Review specific test failures, check upstream data sources


Best Practices Applied

  1. Job Dependencies - Proper task ordering with depends_on
  2. Error Handling - Retries and timeouts configured
  3. Notifications - Email alerts for failures
  4. Monitoring - Comprehensive health checks
  5. Documentation - Clear runbooks and guides
  6. Scheduling - Off-hours execution to minimize impact
  7. Idempotency - Jobs can be safely rerun

Cost Optimization

Current Setup:

Optimization Tips:


Resources


Phase 8 Manual Version 1.0
Last Updated: 2025-01-01