Duration: Days 20-22 | 6-8 hours total
Goal: Automate workflows, implement monitoring, and create operational dashboards
In Phase 8, you will:
Orchestration Philosophy: Automate everything, monitor everything, alert intelligently.
Before starting Phase 8:
Scheduler (Databricks Jobs)
↓
Data Ingestion (Bronze)
↓
dbt Transformations (Silver)
↓
Aggregations (Gold)
↓
ML Predictions
↓
Quality Checks → [Monitor] → [Alerts]
↓
Operational Dashboard
Set up automated job execution.
Create databricks/jobs/bronze_ingestion_job.json:
{
"name": "Bronze Layer Ingestion",
"tags": {
"environment": "production",
"layer": "bronze"
},
"tasks": [
{
"task_key": "mount_storage",
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/01_setup/mount_storage",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "ingest_customers",
"depends_on": [
{
"task_key": "mount_storage"
}
],
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_customers",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "ingest_products",
"depends_on": [
{
"task_key": "mount_storage"
}
],
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_products",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "ingest_orders",
"depends_on": [
{
"task_key": "mount_storage"
}
],
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_orders",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "ingest_web_events",
"depends_on": [
{
"task_key": "mount_storage"
}
],
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/02_bronze_ingestion/bronze_web_events",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
}
],
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"email_notifications": {
"on_failure": ["your_email@domain.com"]
},
"timeout_seconds": 14400,
"max_concurrent_runs": 1
}
Create databricks/jobs/silver_transformation_job.json:
{
"name": "Silver Layer Transformations (dbt)",
"tags": {
"environment": "production",
"layer": "silver"
},
"tasks": [
{
"task_key": "dbt_run_staging",
"python_wheel_task": {
"package_name": "dbt",
"entry_point": "run",
"parameters": ["--models", "staging"]
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"libraries": [
{
"pypi": {
"package": "dbt-databricks==1.6.2"
}
}
],
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "dbt_run_silver",
"depends_on": [
{
"task_key": "dbt_run_staging"
}
],
"python_wheel_task": {
"package_name": "dbt",
"entry_point": "run",
"parameters": ["--models", "silver"]
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "dbt_test",
"depends_on": [
{
"task_key": "dbt_run_silver"
}
],
"python_wheel_task": {
"package_name": "dbt",
"entry_point": "test"
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 1800,
"max_retries": 1
}
],
"schedule": {
"quartz_cron_expression": "0 30 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"email_notifications": {
"on_failure": ["your_email@domain.com"]
},
"timeout_seconds": 7200,
"max_concurrent_runs": 1
}
Create databricks/jobs/gold_aggregation_job.json:
{
"name": "Gold Layer Aggregations",
"tags": {
"environment": "production",
"layer": "gold"
},
"tasks": [
{
"task_key": "dbt_run_gold",
"python_wheel_task": {
"package_name": "dbt",
"entry_point": "run",
"parameters": ["--models", "gold"]
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "run_ml_predictions",
"depends_on": [
{
"task_key": "dbt_run_gold"
}
],
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/06_ml_models/customer_segmentation",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
},
{
"task_key": "run_churn_prediction",
"depends_on": [
{
"task_key": "dbt_run_gold"
}
],
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/06_ml_models/churn_prediction",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID",
"timeout_seconds": 3600,
"max_retries": 2
}
],
"schedule": {
"quartz_cron_expression": "0 0 3 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"email_notifications": {
"on_failure": ["your_email@domain.com"]
},
"timeout_seconds": 7200,
"max_concurrent_runs": 1
}
Create databricks/jobs/master_pipeline_job.json:
{
"name": "Master Data Pipeline",
"tags": {
"environment": "production",
"type": "orchestration"
},
"tasks": [
{
"task_key": "bronze_ingestion",
"run_job_task": {
"job_id": "BRONZE_JOB_ID"
}
},
{
"task_key": "silver_transformation",
"depends_on": [
{
"task_key": "bronze_ingestion"
}
],
"run_job_task": {
"job_id": "SILVER_JOB_ID"
}
},
{
"task_key": "gold_aggregation",
"depends_on": [
{
"task_key": "silver_transformation"
}
],
"run_job_task": {
"job_id": "GOLD_JOB_ID"
}
},
{
"task_key": "quality_checks",
"depends_on": [
{
"task_key": "gold_aggregation"
}
],
"notebook_task": {
"notebook_path": "/Workspace/Users/your_email@domain.com/05_data_quality/data_quality_dashboard",
"base_parameters": {}
},
"existing_cluster_id": "YOUR_CLUSTER_ID"
}
],
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"email_notifications": {
"on_success": ["your_email@domain.com"],
"on_failure": ["your_email@domain.com"]
},
"timeout_seconds": 14400,
"max_concurrent_runs": 1
}
Create scripts/create_databricks_jobs.py:
"""
Create Databricks Jobs from JSON configurations
"""
import json
import os
from databricks_cli.sdk import ApiClient
from databricks_cli.jobs.api import JobsApi
# Initialize Databricks client
api_client = ApiClient(
host=os.getenv('DATABRICKS_HOST'),
token=os.getenv('DATABRICKS_TOKEN')
)
jobs_api = JobsApi(api_client)
def create_job_from_file(filepath):
"""Create Databricks job from JSON config file"""
with open(filepath, 'r') as f:
job_config = json.load(f)
# Create job
response = jobs_api.create_job(job_config)
job_id = response['job_id']
print(f"✅ Created job: {job_config['name']}")
print(f" Job ID: {job_id}")
return job_id
def main():
"""Create all jobs"""
job_configs = [
'databricks/jobs/bronze_ingestion_job.json',
'databricks/jobs/silver_transformation_job.json',
'databricks/jobs/gold_aggregation_job.json'
]
print("=" * 70)
print("CREATING DATABRICKS JOBS")
print("=" * 70)
job_ids = {}
for config_file in job_configs:
if os.path.exists(config_file):
job_id = create_job_from_file(config_file)
job_name = config_file.split('/')[-1].replace('_job.json', '')
job_ids[job_name] = job_id
else:
print(f"⚠️ Config not found: {config_file}")
print("\n" + "=" * 70)
print("✅ JOB CREATION COMPLETE")
print("=" * 70)
print("\nJob IDs:")
for name, job_id in job_ids.items():
print(f" {name}: {job_id}")
if __name__ == "__main__":
main()
Note: You'll need to manually create jobs in Databricks UI first to get job IDs, or use the script above.
✅ CHECKPOINT
Build operational monitoring dashboard.
Create Databricks notebook operational_monitoring:
# Databricks notebook source
# MAGIC %md
# MAGIC # Operational Monitoring Dashboard
# MAGIC
# MAGIC Real-time monitoring of pipeline health and performance
# COMMAND ----------
from datetime import datetime, timedelta
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# COMMAND ----------
# MAGIC %md
# MAGIC ## Pipeline Status Overview
# COMMAND ----------
print("=" * 70)
print("OPERATIONAL HEALTH CHECK")
print(f"Report Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Table Freshness
# COMMAND ----------
# Check when tables were last updated
table_freshness = spark.sql("""
SELECT
'bronze.customers' as table_name,
MAX(ingestion_timestamp) as last_updated,
COUNT(*) as row_count
FROM bronze.customers
UNION ALL
SELECT
'silver.dim_customers',
MAX(dbt_updated_at),
COUNT(*)
FROM silver.dim_customers
UNION ALL
SELECT
'gold.customer_metrics',
MAX(metric_updated_at),
COUNT(*)
FROM gold.customer_metrics
UNION ALL
SELECT
'gold.daily_revenue',
MAX(metric_updated_at),
COUNT(*)
FROM gold.daily_revenue
""").toPandas()
# Calculate freshness in hours
table_freshness['hours_since_update'] = (
pd.Timestamp.now() - pd.to_datetime(table_freshness['last_updated'])
).dt.total_seconds() / 3600
# Add status
table_freshness['status'] = table_freshness['hours_since_update'].apply(
lambda x: '✅ Fresh' if x < 24 else ('⚠️ Stale' if x < 48 else '❌ Very Stale')
)
print("\n📊 TABLE FRESHNESS:")
display(table_freshness[['table_name', 'last_updated', 'row_count', 'hours_since_update', 'status']])
# COMMAND ----------
# MAGIC %md
# MAGIC ## Data Quality Scores
# COMMAND ----------
quality_summary = spark.sql("""
SELECT
layer,
COUNT(*) as table_count,
ROUND(AVG(quality_score), 1) as avg_quality_score,
MIN(quality_score) as min_quality_score,
SUM(CASE WHEN quality_score < 85 THEN 1 ELSE 0 END) as tables_below_threshold
FROM gold.data_quality_metrics
GROUP BY layer
ORDER BY layer
""").toPandas()
print("\n📈 QUALITY SUMMARY BY LAYER:")
display(quality_summary)
# Alert if any layer below threshold
for _, row in quality_summary.iterrows():
if row['min_quality_score'] < 85:
print(f"⚠️ ALERT: {row['layer']} layer has quality score below 85!")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Recent Anomalies
# COMMAND ----------
recent_anomalies = spark.sql("""
SELECT
order_date,
order_count,
revenue,
orders_z_score,
revenue_z_score,
status
FROM gold.data_anomalies
WHERE order_date >= date_sub(current_date(), 7)
AND status = 'Anomaly Detected'
ORDER BY order_date DESC
""").toPandas()
anomaly_count = len(recent_anomalies)
print(f"\n🚨 ANOMALIES (Last 7 Days): {anomaly_count}")
if anomaly_count > 0:
print("⚠️ ALERT: Anomalies detected!")
display(recent_anomalies)
else:
print("✅ No anomalies detected")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Model Performance
# COMMAND ----------
# Churn model performance
churn_dist = spark.sql("""
SELECT
churn_risk,
COUNT(*) as customer_count,
ROUND(AVG(churn_probability), 3) as avg_probability
FROM gold.customer_churn_predictions
GROUP BY churn_risk
ORDER BY
CASE churn_risk
WHEN 'High' THEN 1
WHEN 'Medium' THEN 2
WHEN 'Low' THEN 3
END
""").toPandas()
print("\n🎯 CHURN PREDICTION DISTRIBUTION:")
display(churn_dist)
high_risk_count = churn_dist[churn_dist['churn_risk'] == 'High']['customer_count'].values[0]
if high_risk_count > 500:
print(f"⚠️ ALERT: {high_risk_count} high-risk customers!")
# COMMAND ----------
# MAGIC %md
# MAGIC ## System Metrics
# COMMAND ----------
# Storage usage by layer
storage_usage = spark.sql("""
SELECT
'Bronze' as layer,
COUNT(*) as table_count
FROM bronze.customers
UNION ALL
SELECT 'Silver', COUNT(*) FROM silver.dim_customers
UNION ALL
SELECT 'Gold', COUNT(*) FROM gold.customer_metrics
""").toPandas()
print("\n💾 RECORD COUNTS BY LAYER:")
display(storage_usage)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Revenue Trends
# COMMAND ----------
# Last 30 days revenue
revenue_trend = spark.sql("""
SELECT
order_date,
gross_revenue,
total_orders,
revenue_7d_avg
FROM gold.daily_revenue
WHERE order_date >= date_sub(current_date(), 30)
ORDER BY order_date
""").toPandas()
# Plot
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
# Revenue plot
ax1.plot(revenue_trend['order_date'], revenue_trend['gross_revenue'], label='Daily Revenue', marker='o')
ax1.plot(revenue_trend['order_date'], revenue_trend['revenue_7d_avg'], label='7-Day Average', linestyle='--')
ax1.set_title('Revenue Trend (Last 30 Days)', fontsize=14)
ax1.set_ylabel('Revenue ($)')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Orders plot
ax2.plot(revenue_trend['order_date'], revenue_trend['total_orders'], marker='o', color='orange')
ax2.set_title('Order Volume (Last 30 Days)', fontsize=14)
ax2.set_ylabel('Orders')
ax2.set_xlabel('Date')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
display(fig)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Health Check Summary
# COMMAND ----------
# Calculate overall health score
health_checks = {
'Data Freshness': 'PASS' if all(table_freshness['hours_since_update'] < 24) else 'FAIL',
'Quality Scores': 'PASS' if all(quality_summary['min_quality_score'] >= 85) else 'FAIL',
'Anomaly Detection': 'PASS' if anomaly_count == 0 else 'WARNING',
'Model Predictions': 'PASS' if high_risk_count < 500 else 'WARNING',
'Data Volume': 'PASS' # Assuming volumes are normal
}
print("\n" + "=" * 70)
print("HEALTH CHECK SUMMARY")
print("=" * 70)
for check, status in health_checks.items():
emoji = '✅' if status == 'PASS' else ('⚠️' if status == 'WARNING' else '❌')
print(f"{emoji} {check}: {status}")
# Overall status
if all(v == 'PASS' for v in health_checks.values()):
print("\n✅ ALL SYSTEMS OPERATIONAL")
elif any(v == 'FAIL' for v in health_checks.values()):
print("\n❌ SYSTEM ISSUES DETECTED - IMMEDIATE ACTION REQUIRED")
else:
print("\n⚠️ SYSTEM WARNINGS - MONITOR CLOSELY")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Alert Summary
# COMMAND ----------
alerts = []
# Freshness alerts
stale_tables = table_freshness[table_freshness['hours_since_update'] > 24]
for _, row in stale_tables.iterrows():
alerts.append({
'severity': 'HIGH' if row['hours_since_update'] > 48 else 'MEDIUM',
'type': 'Freshness',
'message': f"{row['table_name']} is {row['hours_since_update']:.1f} hours old"
})
# Quality alerts
low_quality = quality_summary[quality_summary['tables_below_threshold'] > 0]
for _, row in low_quality.iterrows():
alerts.append({
'severity': 'HIGH',
'type': 'Quality',
'message': f"{row['layer']} layer has {row['tables_below_threshold']} tables below quality threshold"
})
# Anomaly alerts
if anomaly_count > 0:
alerts.append({
'severity': 'MEDIUM',
'type': 'Anomaly',
'message': f"{anomaly_count} anomalies detected in last 7 days"
})
# High-risk customer alerts
if high_risk_count > 500:
alerts.append({
'severity': 'MEDIUM',
'type': 'Business',
'message': f"{high_risk_count} customers at high churn risk"
})
if alerts:
print("\n🚨 ACTIVE ALERTS:")
alerts_df = pd.DataFrame(alerts)
display(alerts_df)
else:
print("\n✅ No active alerts")
# COMMAND ----------
print("\n" + "=" * 70)
print("✅ OPERATIONAL MONITORING COMPLETE")
print("=" * 70)
print(f"Next Check: {(datetime.now() + timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')}")
✅ CHECKPOINT
Implement email alerting for critical issues.
Create Databricks notebook alerting_system:
# Databricks notebook source
# MAGIC %md
# MAGIC # Alerting System
# MAGIC
# MAGIC Checks for issues and sends email alerts
# COMMAND ----------
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import pandas as pd
# COMMAND ----------
# Email configuration (use environment variables in production)
SMTP_SERVER = "smtp.gmail.com"
SMTP_PORT = 587
SENDER_EMAIL = "your_email@gmail.com"
SENDER_PASSWORD = dbutils.secrets.get(scope="email", key="password")
RECEIVER_EMAIL = "your_email@gmail.com"
# COMMAND ----------
def send_email(subject, body_html):
"""Send email alert"""
try:
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = SENDER_EMAIL
msg['To'] = RECEIVER_EMAIL
html_part = MIMEText(body_html, 'html')
msg.attach(html_part)
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SENDER_EMAIL, SENDER_PASSWORD)
server.send_message(msg)
print(f"✅ Alert email sent: {subject}")
return True
except Exception as e:
print(f"❌ Failed to send email: {str(e)}")
return False
# COMMAND ----------
# MAGIC %md
# MAGIC ## Check for Issues
# COMMAND ----------
issues = []
# Check 1: Stale data
stale_tables = spark.sql("""
SELECT
'bronze.customers' as table_name,
MAX(ingestion_timestamp) as last_updated
FROM bronze.customers
HAVING TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) > 24
""").collect()
if stale_tables:
issues.append({
'severity': 'HIGH',
'type': 'Stale Data',
'details': f"Tables not updated in 24+ hours: {len(stale_tables)}"
})
# Check 2: Quality issues
low_quality = spark.sql("""
SELECT COUNT(*) as count
FROM gold.data_quality_metrics
WHERE quality_score < 85
""").collect()[0][0]
if low_quality > 0:
issues.append({
'severity': 'HIGH',
'type': 'Data Quality',
'details': f"{low_quality} tables below quality threshold"
})
# Check 3: Anomalies
anomalies = spark.sql("""
SELECT COUNT(*) as count
FROM gold.data_anomalies
WHERE order_date >= date_sub(current_date(), 1)
AND status = 'Anomaly Detected'
""").collect()[0][0]
if anomalies > 0:
issues.append({
'severity': 'MEDIUM',
'type': 'Anomaly',
'details': f"{anomalies} anomalies in last 24 hours"
})
print(f"Issues found: {len(issues)}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Send Alerts
# COMMAND ----------
if issues:
# Create HTML email body
html = f"""
<html>
<head></head>
<body>
<h2>🚨 E-Commerce Analytics Platform - Alert</h2>
<p>Alert Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<h3>Issues Detected:</h3>
<table border="1" cellpadding="5" cellspacing="0">
<tr>
<th>Severity</th>
<th>Type</th>
<th>Details</th>
</tr>
"""
for issue in issues:
color = 'red' if issue['severity'] == 'HIGH' else 'orange'
html += f"""
<tr>
<td style="color: {color}"><strong>{issue['severity']}</strong></td>
<td>{issue['type']}</td>
<td>{issue['details']}</td>
</tr>
"""
html += """
</table>
<h3>Actions Required:</h3>
<ul>
<li>Review operational monitoring dashboard</li>
<li>Check Databricks job logs</li>
<li>Investigate data quality issues</li>
</ul>
<p>
<a href="https://your-databricks-workspace.azuredatabricks.net">
View Databricks Workspace
</a>
</p>
</body>
</html>
"""
# Send alert
subject = f"🚨 Alert: {len(issues)} Issues Detected - E-Commerce Platform"
send_email(subject, html)
else:
print("✅ No issues detected - no alert sent")
# COMMAND ----------
print("=" * 70)
print("✅ ALERTING SYSTEM CHECK COMPLETE")
print("=" * 70)
emailpassword with your email app password✅ CHECKPOINT
Create reusable SQL queries for monitoring.
scripts/operational_queries.sql:-- ============================================================================
-- OPERATIONAL MONITORING QUERIES
-- Quick health checks for the e-commerce analytics platform
-- ============================================================================
-- QUERY 1: Pipeline Status Overview
-- Shows overall health of the data pipeline
SELECT
'Pipeline Health' as metric,
CASE
WHEN stale_count = 0 AND low_quality_count = 0 THEN '✅ Healthy'
WHEN stale_count > 0 OR low_quality_count > 2 THEN '❌ Critical'
ELSE '⚠️ Warning'
END as status,
CONCAT(stale_count, ' stale tables, ', low_quality_count, ' low quality') as details
FROM (
SELECT
COUNT(DISTINCT CASE
WHEN TIMESTAMPDIFF(HOUR, ingestion_timestamp, CURRENT_TIMESTAMP()) > 24
THEN 'stale' END) as stale_count,
(SELECT COUNT(*) FROM gold.data_quality_metrics WHERE quality_score < 85) as low_quality_count
FROM bronze.customers
);
-- QUERY 2: Table Freshness Report
-- Last update time for each critical table
SELECT
'Bronze - Customers' as table_name,
MAX(ingestion_timestamp) as last_updated,
TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) as hours_old,
COUNT(*) as row_count,
CASE
WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 24 THEN '✅'
WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 48 THEN '⚠️'
ELSE '❌'
END as status
FROM bronze.customers
UNION ALL
SELECT
'Bronze - Orders',
MAX(ingestion_timestamp),
TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()),
COUNT(*),
CASE
WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 24 THEN '✅'
WHEN TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) < 48 THEN '⚠️'
ELSE '❌'
END
FROM bronze.orders
UNION ALL
SELECT
'Silver - Customers',
MAX(dbt_updated_at),
TIMESTAMPDIFF(HOUR, MAX(dbt_updated_at), CURRENT_TIMESTAMP()),
COUNT(*),
CASE
WHEN TIMESTAMPDIFF(HOUR, MAX(dbt_updated_at), CURRENT_TIMESTAMP()) < 24 THEN '✅'
WHEN TIMESTAMPDIFF(HOUR, MAX(dbt_updated_at), CURRENT_TIMESTAMP()) < 48 THEN '⚠️'
ELSE '❌'
END
FROM silver.dim_customers
UNION ALL
SELECT
'Gold - Metrics',
MAX(metric_updated_at),
TIMESTAMPDIFF(HOUR, MAX(metric_updated_at), CURRENT_TIMESTAMP()),
COUNT(*),
CASE
WHEN TIMESTAMPDIFF(HOUR, MAX(metric_updated_at), CURRENT_TIMESTAMP()) < 24 THEN '✅'
WHEN TIMESTAMPDIFF(HOUR, MAX(metric_updated_at), CURRENT_TIMESTAMP()) < 48 THEN '⚠️'
ELSE '❌'
END
FROM gold.customer_metrics
ORDER BY table_name;
-- QUERY 3: Data Quality Summary
-- Current quality scores by layer
SELECT
layer,
COUNT(*) as table_count,
ROUND(AVG(quality_score), 1) as avg_score,
MIN(quality_score) as min_score,
MAX(quality_score) as max_score,
SUM(CASE WHEN quality_score < 85 THEN 1 ELSE 0 END) as tables_below_threshold,
CASE
WHEN MIN(quality_score) >= 90 THEN '✅ Excellent'
WHEN MIN(quality_score) >= 85 THEN '✅ Good'
WHEN MIN(quality_score) >= 70 THEN '⚠️ Fair'
ELSE '❌ Poor'
END as status
FROM gold.data_quality_metrics
GROUP BY layer
ORDER BY layer;
-- QUERY 4: Recent Anomalies
-- Anomalies detected in last 7 days
SELECT
order_date,
order_count,
ROUND(revenue, 2) as revenue,
ROUND(orders_z_score, 2) as orders_zscore,
ROUND(revenue_z_score, 2) as revenue_zscore,
status
FROM gold.data_anomalies
WHERE order_date >= DATE_SUB(CURRENT_DATE(), 7)
AND status = 'Anomaly Detected'
ORDER BY order_date DESC;
-- QUERY 5: ML Model Health
-- Current state of ML predictions
SELECT
'Customer Segmentation' as model,
COUNT(DISTINCT customer_id) as predictions,
COUNT(DISTINCT segment_name) as segments,
'✅ Active' as status
FROM gold.customer_segments
UNION ALL
SELECT
'Churn Prediction',
COUNT(DISTINCT customer_id),
COUNT(DISTINCT churn_risk),
CASE
WHEN SUM(CASE WHEN churn_risk = 'High' THEN 1 ELSE 0 END) > 1000 THEN '⚠️ High Risk'
ELSE '✅ Active'
END
FROM gold.customer_churn_predictions
UNION ALL
SELECT
'Product Recommendations',
COUNT(DISTINCT product_id),
COUNT(DISTINCT recommended_product_id),
'✅ Active'
FROM gold.product_recommendations;
-- QUERY 6: System Performance Metrics
-- Row counts and growth by layer
SELECT
'Bronze Layer' as layer,
SUM(row_count) as total_rows,
'Raw ingestion' as description
FROM (
SELECT COUNT(*) as row_count FROM bronze.customers
UNION ALL
SELECT COUNT(*) FROM bronze.products
UNION ALL
SELECT COUNT(*) FROM bronze.orders
UNION ALL
SELECT COUNT(*) FROM bronze.order_items
UNION ALL
SELECT COUNT(*) FROM bronze.web_events
) bronze_tables
UNION ALL
SELECT
'Silver Layer',
SUM(row_count),
'Cleaned & validated'
FROM (
SELECT COUNT(*) FROM silver.dim_customers
UNION ALL
SELECT COUNT(*) FROM silver.dim_products
UNION ALL
SELECT COUNT(*) FROM silver.fact_orders
UNION ALL
SELECT COUNT(*) FROM silver.fact_order_items
UNION ALL
SELECT COUNT(*) FROM silver.fact_web_events
) silver_tables
UNION ALL
SELECT
'Gold Layer',
SUM(row_count),
'Business aggregations'
FROM (
SELECT COUNT(*) FROM gold.customer_metrics
UNION ALL
SELECT COUNT(*) FROM gold.product_performance
UNION ALL
SELECT COUNT(*) FROM gold.daily_revenue
) gold_tables;
-- QUERY 7: Critical Alerts
-- Issues requiring immediate attention
SELECT
'Stale Tables' as alert_type,
'HIGH' as severity,
COUNT(*) as count,
GROUP_CONCAT(table_name) as details
FROM (
SELECT 'bronze.customers' as table_name
FROM bronze.customers
GROUP BY 1
HAVING TIMESTAMPDIFF(HOUR, MAX(ingestion_timestamp), CURRENT_TIMESTAMP()) > 48
) stale
UNION ALL
SELECT
'Low Quality Tables',
'HIGH',
COUNT(*),
GROUP_CONCAT(CONCAT(layer, '.', table_name))
FROM gold.data_quality_metrics
WHERE quality_score < 70
UNION ALL
SELECT
'Recent Anomalies',
'MEDIUM',
COUNT(*),
CONCAT(COUNT(*), ' anomalies detected')
FROM gold.data_anomalies
WHERE order_date >= DATE_SUB(CURRENT_DATE(), 1)
AND status = 'Anomaly Detected'
UNION ALL
SELECT
'High Churn Risk',
'MEDIUM',
COUNT(*),
CONCAT(COUNT(*), ' customers at high risk')
FROM gold.customer_churn_predictions
WHERE churn_risk = 'High';
-- QUERY 8: Daily Pipeline Summary
-- Summary for daily standup/reports
SELECT
CURRENT_DATE() as report_date,
(SELECT COUNT(*) FROM bronze.orders
WHERE DATE(ingestion_timestamp) = CURRENT_DATE()) as orders_ingested_today,
(SELECT COUNT(*) FROM silver.fact_orders
WHERE DATE(dbt_updated_at) = CURRENT_DATE()) as orders_processed_today,
(SELECT ROUND(AVG(quality_score), 1) FROM gold.data_quality_metrics) as avg_quality_score,
(SELECT COUNT(*) FROM gold.data_anomalies
WHERE order_date >= DATE_SUB(CURRENT_DATE(), 1)
AND status = 'Anomaly Detected') as anomalies_24h,
CASE
WHEN (SELECT COUNT(*) FROM gold.data_quality_metrics WHERE quality_score < 85) = 0
THEN '✅ All checks passed'
ELSE '⚠️ Issues detected'
END as overall_status;
✅ CHECKPOINT
Document the orchestration setup.
docs/orchestration_guide.md:# Orchestration & Monitoring Guide
## Overview
This guide describes the automated workflows, monitoring, and alerting for the e-commerce analytics platform.
## Job Schedule
### Daily Pipeline (2:00 AM EST)
1. **Bronze Ingestion** (2:00 AM - 2:30 AM)
- Mounts Azure storage
- Ingests all source data
- Validates schema
- Writes to Delta tables
2. **Silver Transformation** (2:30 AM - 3:00 AM)
- Runs dbt staging models
- Runs dbt Silver models
- Executes data quality tests
- Updates dimensional tables
3. **Gold Aggregation** (3:00 AM - 3:45 AM)
- Runs dbt Gold models
- Generates ML predictions (segmentation, churn)
- Updates business metrics
- Creates executive summaries
4. **Quality Checks** (3:45 AM - 4:00 AM)
- Runs data quality validations
- Updates quality metrics
- Checks for anomalies
- Triggers alerts if needed
### Weekly Jobs
- **Product Recommendations** (Sunday 3:00 AM)
- Rebuilds collaborative filtering model
- Generates fresh recommendations
- **Model Retraining** (Sunday 4:00 AM)
- Retrains customer segmentation
- Validates model performance
- Updates MLflow registry
## Monitoring
### Automated Checks
- **Table Freshness:** Every hour
- **Data Quality:** After each pipeline run
- **Anomaly Detection:** Daily
- **Model Performance:** Weekly
### Dashboards
- **Operational Dashboard:** Real-time health status
- **Data Quality Dashboard:** Detailed quality metrics
- **ML Model Dashboard:** Model performance tracking
### Alert Thresholds
| Alert Type | Threshold | Severity | Action |
|------------|-----------|----------|--------|
| Stale Data | > 24 hours | HIGH | Immediate investigation |
| Quality Score | < 85 | HIGH | Review and fix |
| Quality Score | < 70 | CRITICAL | Stop pipeline |
| Anomaly Detection | Any detected | MEDIUM | Investigate cause |
| High Churn Risk | > 500 customers | MEDIUM | Business review |
| Job Failure | Any failure | HIGH | Check logs immediately |
## Email Notifications
### Recipients
- Data Team: all alerts
- Business Team: business metric alerts only
- On-call: critical alerts only
### Alert Types
- **Success:** Daily pipeline completion
- **Warning:** Quality issues, anomalies
- **Failure:** Job failures, critical issues
## Job Management
### Starting a Job Manually
```bash
# Via Databricks CLI
databricks jobs run-now --job-id <JOB_ID>
# Via UI
# Go to Workflows → Jobs → Select Job → Run Now
Step 1: Check Job Status
Step 2: Review Task Logs
Step 3: Common Issues
Step 4: Rerun Job
Scenario 1: Corrupted Table
-- Restore from Delta history
RESTORE TABLE silver.dim_customers TO VERSION AS OF 24 HOURS AGO;
Scenario 2: Failed Pipeline
Scenario 3: Data Quality Issues
Data Engineering Team
Business Intelligence Team
DevOps/Infrastructure
2. **Create runbook:**
Create `docs/runbook.md`:
```markdown
# Operational Runbook
## Daily Operations
### Morning Check (9:00 AM)
- [ ] Review overnight job status
- [ ] Check operational dashboard
- [ ] Verify no critical alerts
- [ ] Review data quality scores
- [ ] Check for anomalies
### End of Day (5:00 PM)
- [ ] Verify all jobs completed
- [ ] Review any incidents
- [ ] Update ticket system
- [ ] Brief team on any issues
## Weekly Tasks
### Monday
- [ ] Review weekly summary
- [ ] Check model performance
- [ ] Analyze anomalies from previous week
### Friday
- [ ] Prepare weekend coverage
- [ ] Review upcoming changes
- [ ] Update documentation
## Incident Response
### P1: Critical (Pipeline Down)
1. Acknowledge alert
2. Check job status in Databricks
3. Review error logs
4. Notify team in Slack
5. Fix issue or escalate
6. Document in incident log
7. Post-mortem within 24 hours
### P2: High (Data Quality Issues)
1. Check quality dashboard
2. Identify affected tables
3. Stop downstream jobs if needed
4. Investigate root cause
5. Fix and validate
6. Resume pipeline
7. Document resolution
### P3: Medium (Anomalies Detected)
1. Review anomaly details
2. Verify data accuracy
3. Investigate business context
4. Document findings
5. Adjust thresholds if false positive
## Emergency Contacts
**Escalation Path:**
1. Data Engineer on-call
2. Data Engineering Manager
3. VP of Engineering
**24/7 Support:**
- PagerDuty: data-engineering
- Slack: #data-engineering-alerts
✅ CHECKPOINT
# Check status
git status
# Add all orchestration files
git add databricks/jobs/
git add scripts/operational_queries.sql
git add docs/orchestration_guide.md
git add docs/runbook.md
# Commit
git commit -m "Phase 8 complete: Orchestration & Monitoring
- Created 4 Databricks job configurations
- Built operational monitoring dashboard
- Implemented alerting system with email notifications
- Created reusable operational SQL queries
- Documented complete orchestration workflow
- Created runbook for daily operations
- Scheduled daily pipeline execution (2 AM)
- Set up health checks and quality monitoring
- All jobs configured with error handling and retries"
# Push to GitHub
git push origin main
✅ CHECKPOINT
✅ Automated Jobs (4 jobs)
✅ Monitoring System
✅ Alerting Infrastructure
✅ Operational Tools
✅ Documentation
| Job | Time (EST) | Duration | Frequency |
|---|---|---|---|
| Bronze Ingestion | 2:00 AM | 30 min | Daily |
| Silver Transform | 2:30 AM | 30 min | Daily |
| Gold Aggregation | 3:00 AM | 45 min | Daily |
| Quality Checks | 3:45 AM | 15 min | Daily |
| Product Recommendations | 3:00 AM | 30 min | Weekly (Sun) |
| Model Retraining | 4:00 AM | 60 min | Weekly (Sun) |
Total Daily Runtime: ~90 minutes
Total Weekly Runtime: ~11 hours
Data Freshness:
Data Quality:
Anomaly Detection:
System Health:
In Phase 9, you will:
Estimated Time: 4-6 hours over Days 23-24
Issue: Jobs not starting on schedule
Solution: Check job schedule settings, verify cluster is not terminated
Issue: Email alerts not sending
Solution: Verify SMTP settings, check secrets are configured correctly
Issue: Monitoring dashboard shows stale data
Solution: Check if jobs completed successfully, review job logs
Issue: Job running too long
Solution: Increase cluster size, optimize queries, check for data skew
Issue: Quality checks failing
Solution: Review specific test failures, check upstream data sources
Current Setup:
Optimization Tips:
Phase 8 Manual Version 1.0
Last Updated: 2025-01-01