Last Updated: November 2, 2025 | Estimated Time: 8-12 hours | Budget: $30-35 (under $100 credits)
An enterprise-grade AWS data engineering pipeline that unifies sales, customer, and marketing data. You'll implement ETL pipelines, data warehousing, machine learning predictions, and interactive dashboardsβall following production-ready patterns.
Technologies: S3, AWS Glue (PySpark), Athena, Redshift, SageMaker, QuickSight, Lambda, EventBridge
This project is designed to familiarize with the AWS cloud console. Throughout this guide, you'll see CLI commands in gray boxesβthese are completely optional and just shown as alternatives for developers who prefer typing commands. If you see a CLI command, just look for the "Console Method" section above itβthat's the click-through approach.
When you see this: π» CLI Alternative (Optional) β You can safely skip it!
You're building a data platform for OmniRetail, a hybrid retail company that:
We'll use the Medallion Architecture (Bronze β Silver β Gold), a proven data engineering pattern:
| Layer | AWS Service | Purpose | Data Quality |
|---|---|---|---|
| Bronze | S3 | Raw, unprocessed data (CSV, JSON) | As-is from source systems |
| Silver | S3 + Glue | Cleaned, normalized, deduplicated | Validated and typed |
| Gold | S3 + Athena | Business-level aggregations | Analytics-ready |
| Warehouse | Redshift | Fast SQL queries for BI tools | Optimized for reporting |
| ML | SageMaker | Predictive models | Feature-engineered |
| Visualization | QuickSight | Executive dashboards | Business KPIs |
This guide assumes you're new to AWS but have basic programming knowledge. We'll walk through every step in detail.
This guide is designed for the AWS Console web interface. You can complete 100% of this project by clicking through your browserβno command line needed! I'll show CLI commands as optional alternatives for those interested.
AWS Identity and Access Management (IAM) controls who can access your AWS resources and what they can do. We'll create a dedicated user with only the permissions we needβthis is a security best practice called the Principle of Least Privilege.
omni-adminAmazonS3FullAccess β For storing dataAWSGlueServiceRole β For running ETL jobsAmazonAthenaFullAccess β For querying dataAmazonRedshiftFullAccess β For data warehousingAmazonSageMakerFullAccess β For machine learningAWSLambdaFullAccess β For automationCloudWatchLogsFullAccess β For monitoringAfter creating the user, you'll see a success screen. Click "Download .csv file" to save your access keys. You'll need these for the CLI setup. This is your only chance to download the secret access key!
Access keys are only needed if you want to use the AWS CLI or SDK. For this guide, you can skip this and use the console for everything. If you want CLI access later, you can create keys anytime from: IAM β Users β Your User β Security Credentials β Create Access Key.
You can complete this entire project through the AWS Console web interface. The CLI is just a faster way to run commands, but everything shown with CLI commands can be done by clicking through the console.
omni-admin# On macOS/Linux
curl "https://awscli.amazonaws.com/AWSCLIV2.pkg" -o "AWSCLIV2.pkg"
sudo installer -pkg AWSCLIV2.pkg -target /
# On Windows: Download installer from aws.amazon.com/cli
# Or via pip
pip install awscli
aws configure
# Enter your Access Key ID, Secret Key, region (us-east-1), and format (json)
You now have an IAM user with appropriate permissions and a configured AWS CLI. You're ready to start building!
Amazon S3 (Simple Storage Service) is object storage that acts as our data lake. Think of it as an infinitely scalable file system in the cloud. We'll organize our data using a "folder" structure (technically called prefixes) to separate Bronze, Silver, and Gold layers.
omniretail-data-[YOUR-INITIALS]
omniretail-data-jdomniretail-data-jd-2024)US East (N. Virginia) us-east-1 (or your preferred region)# Create bucket via CLI
aws s3 mb s3://omniretail-data-[YOUR-INITIALS]
# Verify it was created
aws s3 ls
aws s3api put-bucket-encryption \
--bucket omniretail-data-[YOUR-INITIALS] \
--server-side-encryption-configuration '{
"Rules": [{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "AES256"
}
}]
}'
aws s3api put-bucket-versioning \
--bucket omniretail-data-[YOUR-INITIALS] \
--versioning-configuration Status=Enabled
bronze β Click Create foldersilver, gold, athena-results, models, scriptsYour bucket should now have this structure:
# Create folder structure via CLI
aws s3api put-object --bucket omniretail-data-[YOUR-INITIALS] --key bronze/
aws s3api put-object --bucket omniretail-data-[YOUR-INITIALS] --key silver/
aws s3api put-object --bucket omniretail-data-[YOUR-INITIALS] --key gold/
aws s3api put-object --bucket omniretail-data-[YOUR-INITIALS] --key athena-results/
aws s3api put-object --bucket omniretail-data-[YOUR-INITIALS] --key models/
aws s3api put-object --bucket omniretail-data-[YOUR-INITIALS] --key scripts/
# Verify structure
aws s3 ls s3://omniretail-data-[YOUR-INITIALS]/ --recursive
| Prefix (Folder) | Purpose | File Format | Data Quality |
|---|---|---|---|
bronze/ |
Raw data from source systems | CSV, JSON | Unvalidated, as-is |
silver/ |
Cleaned and normalized data | Parquet | Schema-enforced |
gold/ |
Business-level aggregations | Parquet | Analytics-ready |
athena-results/ |
Query output storage | Various | - |
models/ |
Trained ML models | .joblib, .pkl | - |
Your S3 data lake is configured with encryption, versioning, and the medallion architecture folder structure.
For this learning project, we'll generate realistic fake data using Python's faker library. In a real project, this Bronze layer would receive data from:
Option A: Generate data locally with Python (more realistic)
Option B: Download pre-made sample files and upload via console (easier)
pip install pandas faker
Create a new file: data_gen/generate_bronze_data.py
import pandas as pd
import random
from faker import Faker
from datetime import datetime, timedelta
import json
import os
fake = Faker()
Faker.seed(42) # For reproducibility
random.seed(42)
# Create output directory
os.makedirs('data_gen/sample_outputs', exist_ok=True)
# ==================== CRM CUSTOMERS ====================
print("Generating CRM customer data...")
customers = []
for i in range(1, 5001): # 5,000 customers
customers.append({
'customer_id': i,
'first_name': fake.first_name(),
'last_name': fake.last_name(),
'email': fake.email(),
'phone': fake.phone_number(),
'signup_date': fake.date_between(start_date='-2y', end_date='today'),
'region': random.choice(['North', 'South', 'East', 'West', 'Central']),
'signup_source': random.choice(['Website', 'Mobile App', 'Referral', 'Social Media']),
'customer_segment': random.choice(['Bronze', 'Silver', 'Gold', 'Platinum'])
})
df_customers = pd.DataFrame(customers)
df_customers.to_csv('data_gen/sample_outputs/crm_customers.csv', index=False)
print(f"β
Created crm_customers.csv with {len(df_customers)} records")
# ==================== MARKETING CAMPAIGNS ====================
print("Generating marketing campaign data...")
campaigns = []
campaign_channels = ['Email', 'SMS', 'Social Media', 'Display Ads', 'Search Ads']
for i in range(1, 51): # 50 campaigns
campaigns.append({
'campaign_id': i,
'campaign_name': f"Campaign_{i}_{random.choice(['Spring', 'Summer', 'Fall', 'Winter'])}",
'campaign_channel': random.choice(campaign_channels),
'start_date': fake.date_between(start_date='-1y', end_date='-30d'),
'end_date': fake.date_between(start_date='-30d', end_date='today'),
'budget': round(random.uniform(5000, 50000), 2),
'target_segment': random.choice(['Bronze', 'Silver', 'Gold', 'Platinum', 'All'])
})
df_campaigns = pd.DataFrame(campaigns)
df_campaigns.to_csv('data_gen/sample_outputs/marketing_campaigns.csv', index=False)
print(f"β
Created marketing_campaigns.csv with {len(df_campaigns)} records")
# ==================== SALES ORDERS ====================
print("Generating sales order data...")
orders = []
order_id = 1
for customer in customers:
# Each customer makes 1-10 orders
num_orders = random.randint(1, 10)
for _ in range(num_orders):
campaign_id = random.choice([c['campaign_id'] for c in campaigns] + [None, None]) # Some organic
orders.append({
'order_id': order_id,
'customer_id': customer['customer_id'],
'order_date': fake.date_between(start_date='-1y', end_date='today'),
'product_category': random.choice(['Electronics', 'Clothing', 'Home', 'Beauty', 'Sports']),
'product_name': fake.catch_phrase(),
'quantity': random.randint(1, 5),
'unit_price': round(random.uniform(10, 500), 2),
'discount': round(random.uniform(0, 0.3), 2) if random.random() > 0.7 else 0,
'campaign_id': campaign_id,
'order_status': random.choice(['Completed', 'Completed', 'Completed', 'Cancelled', 'Returned'])
})
order_id += 1
df_orders = pd.DataFrame(orders)
df_orders['amount'] = df_orders['quantity'] * df_orders['unit_price'] * (1 - df_orders['discount'])
df_orders.to_csv('data_gen/sample_outputs/sales_orders.csv', index=False)
print(f"β
Created sales_orders.csv with {len(df_orders)} records")
# ==================== CUSTOMER INTERACTIONS (JSON) ====================
print("Generating interaction events...")
interactions = []
for i in range(10000): # 10,000 events
interactions.append({
'event_id': i + 1,
'customer_id': random.randint(1, 5000),
'event_type': random.choice(['page_view', 'add_to_cart', 'checkout_start', 'search', 'product_view']),
'timestamp': fake.date_time_between(start_date='-1y', end_date='now').isoformat(),
'page_url': fake.uri(),
'device': random.choice(['mobile', 'desktop', 'tablet']),
'session_id': fake.uuid4()
})
with open('data_gen/sample_outputs/interactions.json', 'w') as f:
json.dump(interactions, f, indent=2)
print(f"β
Created interactions.json with {len(interactions)} events")
print("\nπ All Bronze layer data generated successfully!")
print("Files created in: data_gen/sample_outputs/")
# Run the generator
python data_gen/generate_bronze_data.py
# Expected output:
# Generating CRM customer data...
# β
Created crm_customers.csv with 5000 records
# Generating marketing campaign data...
# β
Created marketing_campaigns.csv with 50 records
# Generating sales order data...
# β
Created sales_orders.csv with 35000 records
# Generating interaction events...
# β
Created interactions.json with 10000 events
# π All Bronze layer data generated successfully!
data_gen/sample_outputs/:
Create scripts/extract_to_s3.py:
import boto3
import os
s3 = boto3.client('s3')
BUCKET_NAME = 'omniretail-data-[YOUR-INITIALS]'
BRONZE_PREFIX = 'bronze/'
files_to_upload = [
'data_gen/sample_outputs/crm_customers.csv',
'data_gen/sample_outputs/marketing_campaigns.csv',
'data_gen/sample_outputs/sales_orders.csv',
'data_gen/sample_outputs/interactions.json'
]
for file_path in files_to_upload:
file_name = os.path.basename(file_path)
s3_key = f"{BRONZE_PREFIX}{file_name}"
print(f"Uploading {file_name}...")
s3.upload_file(file_path, BUCKET_NAME, s3_key)
print(f"β
Uploaded {file_name}")
print("π All files uploaded!")
Then run: python scripts/extract_to_s3.py
Don't want to run Python? I can provide you with pre-generated sample CSV files that you can download and upload directly through the console!
I can generate these for you if you'd like. Otherwise, here's a minimal dataset you can create manually:
Create: crm_customers.csv (in Excel or text editor)
customer_id,first_name,last_name,email,region,signup_source,customer_segment
1,John,Doe,john@example.com,North,Website,Gold
2,Jane,Smith,jane@example.com,South,Mobile App,Platinum
3,Bob,Johnson,bob@example.com,East,Referral,Silver
4,Alice,Williams,alice@example.com,West,Website,Bronze
5,Charlie,Brown,charlie@example.com,Central,Social Media,Gold
Create: sales_orders.csv
order_id,customer_id,order_date,product_category,quantity,unit_price,amount,order_status,campaign_id
1,1,2024-10-15,Electronics,2,299.99,599.98,Completed,1
2,2,2024-10-16,Clothing,1,49.99,49.99,Completed,2
3,3,2024-10-17,Home,3,89.99,269.97,Completed,1
4,1,2024-10-20,Beauty,1,24.99,24.99,Completed,3
5,4,2024-10-22,Sports,2,149.99,299.98,Completed,2
Create: marketing_campaigns.csv
campaign_id,campaign_name,campaign_channel,budget
1,Fall_Email_Blast,Email,10000
2,Summer_SMS_Promo,SMS,5000
3,Social_Media_Oct,Social Media,15000
# Install required libraries
pip install pandas faker
# Run the generator
python data_gen/generate_bronze_data.py
# Expected output:
# Generating CRM customer data...
# β
Created crm_customers.csv with 5000 records
# Generating marketing campaign data...
# β
Created marketing_campaigns.csv with 50 records
# Generating sales order data...
# β
Created sales_orders.csv with 35000 records
# Generating interaction events...
# β
Created interactions.json with 10000 events
# π All Bronze layer data generated successfully!
# List files in Bronze layer
aws s3 ls s3://omniretail-data-[YOUR-INITIALS]/bronze/
# Expected output:
# 2025-11-02 10:30:15 245678 crm_customers.csv
# 2025-11-02 10:30:16 12345 marketing_campaigns.csv
# 2025-11-02 10:30:17 1234567 sales_orders.csv
# 2025-11-02 10:30:18 890123 interactions.json
Your Bronze layer is populated with realistic synthetic data representing customers, orders, campaigns, and web interactions.
All remaining steps (Glue jobs, Redshift, SageMaker, QuickSight) can be completed entirely through the AWS Console web interface. No command line required! The CLI commands shown are just optional alternatives for those who prefer typing commands.
AWS Glue is a serverless ETL (Extract, Transform, Load) service. It runs Apache Spark jobs without you managing servers. In this step, we'll:
AWSGlueServiceRole (pre-attached)AmazonS3FullAccess (for reading/writing data)AWSGlueServiceRole-OmniRetailGlue jobs run under an IAM role, not your user account. This role gives Glue permission to read from S3, write back to S3, and log to CloudWatch.
Create scripts/glue_silver_job.py:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import lower, trim, col, when, regexp_replace
from pyspark.sql.types import DoubleType, IntegerType, DateType
## Get job parameters
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'BUCKET'])
BUCKET = args['BUCKET']
## Initialize Glue context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print("=== Starting Bronze to Silver Transformation ===")
# ==================== CLEAN CRM CUSTOMERS ====================
print("Processing CRM customers...")
df_customers_raw = spark.read.csv(
f"s3://{BUCKET}/bronze/crm_customers.csv",
header=True,
inferSchema=True
)
df_customers_clean = df_customers_raw \
.dropna(subset=['customer_id', 'email']) \
.withColumn('email', lower(trim(col('email')))) \
.withColumn('phone', regexp_replace(col('phone'), '[^0-9]', '')) \
.dropDuplicates(['customer_id'])
# Convert to Parquet and write to Silver
df_customers_clean.write.mode('overwrite').parquet(
f"s3://{BUCKET}/silver/crm_customers/"
)
print(f"β
Wrote {df_customers_clean.count()} cleaned customer records to Silver")
# ==================== CLEAN SALES ORDERS ====================
print("Processing sales orders...")
df_orders_raw = spark.read.csv(
f"s3://{BUCKET}/bronze/sales_orders.csv",
header=True,
inferSchema=True
)
df_orders_clean = df_orders_raw \
.dropna(subset=['order_id', 'customer_id']) \
.filter(col('amount') > 0) \
.filter(col('order_status') == 'Completed') \
.withColumn('amount', col('amount').cast(DoubleType())) \
.dropDuplicates(['order_id'])
df_orders_clean.write.mode('overwrite').parquet(
f"s3://{BUCKET}/silver/sales_orders/"
)
print(f"β
Wrote {df_orders_clean.count()} cleaned order records to Silver")
# ==================== CLEAN MARKETING CAMPAIGNS ====================
print("Processing marketing campaigns...")
df_campaigns_raw = spark.read.csv(
f"s3://{BUCKET}/bronze/marketing_campaigns.csv",
header=True,
inferSchema=True
)
df_campaigns_clean = df_campaigns_raw \
.dropna(subset=['campaign_id']) \
.withColumn('budget', col('budget').cast(DoubleType())) \
.dropDuplicates(['campaign_id'])
df_campaigns_clean.write.mode('overwrite').parquet(
f"s3://{BUCKET}/silver/marketing_campaigns/"
)
print(f"β
Wrote {df_campaigns_clean.count()} cleaned campaign records to Silver")
# ==================== CLEAN INTERACTIONS (JSON) ====================
print("Processing interaction events...")
df_interactions_raw = spark.read.json(f"s3://{BUCKET}/bronze/interactions.json")
df_interactions_clean = df_interactions_raw \
.dropna(subset=['event_id', 'customer_id']) \
.dropDuplicates(['event_id'])
df_interactions_clean.write.mode('overwrite').parquet(
f"s3://{BUCKET}/silver/interactions/"
)
print(f"β
Wrote {df_interactions_clean.count()} cleaned interaction records to Silver")
print("=== Bronze to Silver Transformation Complete ===")
job.commit()
bronze_to_silverAWSGlueServiceRole-OmniRetail--BUCKET, Value: omniretail-data-[YOUR-INITIALS]You can paste code directly into the Glue script editorβno need to save a .py file and upload to S3. The console method is actually easier!
If you prefer to manage scripts in S3:
glue_silver_job.pys3://omniretail-data-[YOUR-INITIALS]/scripts/glue_silver_job.pypart-00000-xxx.snappy.parquet)# Check Silver layer
aws s3 ls s3://omniretail-data-[YOUR-INITIALS]/silver/ --recursive
Your Silver layer now contains cleaned, validated data in Parquet formatβready for analytics!
The Gold layer contains business-ready aggregations. Instead of row-level transactional data, we create summary tables that answer specific business questions:
Create scripts/glue_gold_job.py:
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import sum, count, avg, max, min, col, countDistinct
## Get job parameters
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'BUCKET'])
BUCKET = args['BUCKET']
## Initialize
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
print("=== Starting Silver to Gold Aggregation ===")
# Load Silver tables
df_customers = spark.read.parquet(f"s3://{BUCKET}/silver/crm_customers/")
df_orders = spark.read.parquet(f"s3://{BUCKET}/silver/sales_orders/")
df_campaigns = spark.read.parquet(f"s3://{BUCKET}/silver/marketing_campaigns/")
# ==================== AGGREGATION 1: Sales Summary by Region & Campaign ====================
print("Creating sales_summary_by_region_campaign...")
df_sales_summary = df_orders \
.join(df_customers, 'customer_id', 'left') \
.join(df_campaigns, 'campaign_id', 'left') \
.groupBy('region', 'campaign_channel') \
.agg(
sum('amount').alias('total_sales'),
count('order_id').alias('total_orders'),
countDistinct('customer_id').alias('unique_customers'),
avg('amount').alias('avg_order_value')
)
df_sales_summary.write.mode('overwrite').parquet(
f"s3://{BUCKET}/gold/sales_summary_by_region_campaign/"
)
print(f"β
Created sales_summary with {df_sales_summary.count()} rows")
# ==================== AGGREGATION 2: Customer Lifetime Value ====================
print("Creating customer_lifetime_value...")
df_customer_ltv = df_orders \
.groupBy('customer_id') \
.agg(
sum('amount').alias('total_spent'),
count('order_id').alias('total_orders'),
avg('amount').alias('avg_order_value'),
max('order_date').alias('last_purchase_date'),
min('order_date').alias('first_purchase_date')
) \
.join(df_customers[['customer_id', 'region', 'customer_segment']], 'customer_id')
df_customer_ltv.write.mode('overwrite').parquet(
f"s3://{BUCKET}/gold/customer_lifetime_value/"
)
print(f"β
Created customer_ltv with {df_customer_ltv.count()} rows")
# ==================== AGGREGATION 3: Campaign Performance ====================
print("Creating campaign_performance...")
df_campaign_perf = df_orders \
.filter(col('campaign_id').isNotNull()) \
.join(df_campaigns, 'campaign_id') \
.groupBy('campaign_id', 'campaign_name', 'campaign_channel', 'budget') \
.agg(
sum('amount').alias('revenue_generated'),
count('order_id').alias('conversions'),
countDistinct('customer_id').alias('customers_acquired')
) \
.withColumn('roi', (col('revenue_generated') / col('budget') - 1) * 100)
df_campaign_perf.write.mode('overwrite').parquet(
f"s3://{BUCKET}/gold/campaign_performance/"
)
print(f"β
Created campaign_performance with {df_campaign_perf.count()} rows")
# ==================== AGGREGATION 4: Daily Sales Trend ====================
print("Creating daily_sales_trend...")
df_daily_sales = df_orders \
.groupBy('order_date') \
.agg(
sum('amount').alias('daily_revenue'),
count('order_id').alias('daily_orders')
) \
.orderBy('order_date')
df_daily_sales.write.mode('overwrite').parquet(
f"s3://{BUCKET}/gold/daily_sales_trend/"
)
print(f"β
Created daily_sales_trend with {df_daily_sales.count()} rows")
print("=== Silver to Gold Aggregation Complete ===")
job.commit()
aws s3 cp scripts/glue_gold_job.py s3://omniretail-data-[YOUR-INITIALS]/scripts/silver_to_gold with same settings as Step 4s3://omniretail-data-[YOUR-INITIALS]/scripts/glue_gold_job.py--BUCKET = omniretail-data-[YOUR-INITIALS]aws s3 ls s3://omniretail-data-[YOUR-INITIALS]/gold/ --recursive
# Expected folders:
# gold/sales_summary_by_region_campaign/
# gold/customer_lifetime_value/
# gold/campaign_performance/
# gold/daily_sales_trend/
Your Gold layer contains business-level aggregations perfect for dashboards and ML!
Amazon Athena is a serverless query service that lets you run SQL directly on S3 data. No database servers to manageβyou just point it at your Parquet files and query!
s3://omniretail-data-[YOUR-INITIALS]/athena-results/In the Athena query editor, run:
CREATE DATABASE omniretail;
These tables "point" to your S3 data without copying it:
-- Silver: CRM Customers
CREATE EXTERNAL TABLE omniretail.silver_customers (
customer_id INT,
first_name STRING,
last_name STRING,
email STRING,
phone STRING,
signup_date DATE,
region STRING,
signup_source STRING,
customer_segment STRING
)
STORED AS PARQUET
LOCATION 's3://omniretail-data-[YOUR-INITIALS]/silver/crm_customers/';
-- Silver: Sales Orders
CREATE EXTERNAL TABLE omniretail.silver_orders (
order_id INT,
customer_id INT,
order_date DATE,
product_category STRING,
product_name STRING,
quantity INT,
unit_price DOUBLE,
discount DOUBLE,
campaign_id INT,
order_status STRING,
amount DOUBLE
)
STORED AS PARQUET
LOCATION 's3://omniretail-data-[YOUR-INITIALS]/silver/sales_orders/';
-- Gold: Sales Summary
CREATE EXTERNAL TABLE omniretail.gold_sales_summary (
region STRING,
campaign_channel STRING,
total_sales DOUBLE,
total_orders BIGINT,
unique_customers BIGINT,
avg_order_value DOUBLE
)
STORED AS PARQUET
LOCATION 's3://omniretail-data-[YOUR-INITIALS]/gold/sales_summary_by_region_campaign/';
-- Gold: Customer LTV
CREATE EXTERNAL TABLE omniretail.gold_customer_ltv (
customer_id INT,
total_spent DOUBLE,
total_orders BIGINT,
avg_order_value DOUBLE,
last_purchase_date DATE,
first_purchase_date DATE,
region STRING,
customer_segment STRING
)
STORED AS PARQUET
LOCATION 's3://omniretail-data-[YOUR-INITIALS]/gold/customer_lifetime_value/';
SELECT
customer_id,
region,
customer_segment,
total_spent,
total_orders,
ROUND(avg_order_value, 2) as avg_order_value
FROM omniretail.gold_customer_ltv
ORDER BY total_spent DESC
LIMIT 10;
SELECT
campaign_channel,
SUM(total_sales) as total_revenue,
SUM(total_orders) as total_orders,
ROUND(SUM(total_sales) / SUM(total_orders), 2) as avg_order_value
FROM omniretail.gold_sales_summary
GROUP BY campaign_channel
ORDER BY total_revenue DESC;
SELECT
region,
SUM(total_sales) as revenue,
SUM(unique_customers) as customers,
ROUND(SUM(total_sales) / SUM(unique_customers), 2) as revenue_per_customer
FROM omniretail.gold_sales_summary
GROUP BY region
ORDER BY revenue DESC;
SageMaker often expects CSV, so export Gold data:
CREATE TABLE omniretail.customer_ltv_for_ml
WITH (
format = 'TEXTFILE',
external_location = 's3://omniretail-data-[YOUR-INITIALS]/gold/customer_ltv_csv/'
) AS
SELECT
customer_id,
total_spent,
total_orders,
avg_order_value,
CASE
WHEN customer_segment = 'Platinum' THEN 4
WHEN customer_segment = 'Gold' THEN 3
WHEN customer_segment = 'Silver' THEN 2
ELSE 1
END as segment_score
FROM omniretail.gold_customer_ltv;
You can now query your data lake with SQL! Athena makes exploring millions of rows feel instant.
While Athena is great for ad-hoc queries, Redshift provides:
omniretail-workgroupAmazonS3ReadOnlyAccessRedshiftS3AccessRolearn:aws:iam::YOUR-ACCOUNT-ID:role/RedshiftS3AccessRoleRedshiftS3AccessRoledev (default)admin (default)-- Create schema
CREATE SCHEMA analytics;
-- Sales Summary Table
CREATE TABLE analytics.sales_summary (
region VARCHAR(50),
campaign_channel VARCHAR(50),
total_sales DECIMAL(18,2),
total_orders BIGINT,
unique_customers BIGINT,
avg_order_value DECIMAL(18,2)
);
-- Customer LTV Table
CREATE TABLE analytics.customer_ltv (
customer_id INT,
total_spent DECIMAL(18,2),
total_orders BIGINT,
avg_order_value DECIMAL(18,2),
last_purchase_date DATE,
first_purchase_date DATE,
region VARCHAR(50),
customer_segment VARCHAR(20)
);
-- Campaign Performance Table
CREATE TABLE analytics.campaign_performance (
campaign_id INT,
campaign_name VARCHAR(100),
campaign_channel VARCHAR(50),
budget DECIMAL(18,2),
revenue_generated DECIMAL(18,2),
conversions BIGINT,
customers_acquired BIGINT,
roi DECIMAL(10,2)
);
-- Load Sales Summary
COPY analytics.sales_summary
FROM 's3://omniretail-data-[YOUR-INITIALS]/gold/sales_summary_by_region_campaign/'
IAM_ROLE 'arn:aws:iam::YOUR-ACCOUNT-ID:role/RedshiftS3AccessRole'
FORMAT AS PARQUET;
-- Load Customer LTV
COPY analytics.customer_ltv
FROM 's3://omniretail-data-[YOUR-INITIALS]/gold/customer_lifetime_value/'
IAM_ROLE 'arn:aws:iam::YOUR-ACCOUNT-ID:role/RedshiftS3AccessRole'
FORMAT AS PARQUET;
-- Verify load
SELECT COUNT(*) FROM analytics.sales_summary;
SELECT COUNT(*) FROM analytics.customer_ltv;
Find your AWS Account ID in the top-right of the console (click your username). It's a 12-digit number.
-- Top regions by revenue
SELECT
region,
SUM(total_sales) as revenue
FROM analytics.sales_summary
GROUP BY region
ORDER BY revenue DESC;
-- Campaign ROI Analysis
SELECT
campaign_channel,
SUM(revenue_generated) as total_revenue,
SUM(budget) as total_budget,
ROUND((SUM(revenue_generated) / SUM(budget) - 1) * 100, 2) as avg_roi
FROM analytics.campaign_performance
GROUP BY campaign_channel
ORDER BY avg_roi DESC;
Your data warehouse is operational! Redshift now serves as the single source of truth for BI dashboards.
We'll build a binary classification model to predict: "Will this customer make another purchase next month?"
Why this matters: You can target high-probability repeat buyers with retention campaigns and win-back low-probability customers.
omniretail-ml-notebookomniretail-data-[YOUR-INITIALS]In JupyterLab, create a new notebook: customer_repeat_purchase_model.ipynb
import boto3
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import joblib
import os
# Configuration
BUCKET_NAME = 'omniretail-data-[YOUR-INITIALS]'
s3 = boto3.client('s3')
print("π₯ Downloading data from S3...")
# Download customer LTV data
s3.download_file(
BUCKET_NAME,
'gold/customer_ltv_csv/000000_0', # Athena exports have this naming
'customer_ltv.csv'
)
# Load data
df = pd.read_csv('customer_ltv.csv')
print(f"Loaded {len(df)} customer records")
print(df.head())
# ==================== FEATURE ENGINEERING ====================
print("\nπ§ Engineering features...")
# Convert dates
df['last_purchase_date'] = pd.to_datetime(df['last_purchase_date'])
df['first_purchase_date'] = pd.to_datetime(df['first_purchase_date'])
# Calculate days since last purchase
df['days_since_last_purchase'] = (pd.Timestamp.now() - df['last_purchase_date']).dt.days
# Calculate customer tenure (days as customer)
df['customer_tenure_days'] = (df['last_purchase_date'] - df['first_purchase_date']).dt.days
# Average days between purchases
df['avg_days_between_purchases'] = df['customer_tenure_days'] / (df['total_orders'] - 1).replace(0, 1)
# Create label: Did they purchase in last 30 days? (proxy for "will purchase again")
df['will_repurchase'] = (df['days_since_last_purchase'] <= 30).astype(int)
# Select features
feature_cols = [
'total_spent',
'total_orders',
'avg_order_value',
'days_since_last_purchase',
'customer_tenure_days',
'avg_days_between_purchases',
'segment_score'
]
X = df[feature_cols]
y = df['will_repurchase']
print(f"Features: {X.shape}")
print(f"Label distribution: {y.value_counts().to_dict()}")
# ==================== TRAIN-TEST SPLIT ====================
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
print(f"\nTraining set: {X_train.shape[0]} samples")
print(f"Test set: {X_test.shape[0]} samples")
# ==================== MODEL TRAINING ====================
print("\nπ€ Training XGBoost model...")
model = XGBClassifier(
n_estimators=100,
max_depth=5,
learning_rate=0.1,
random_state=42,
eval_metric='logloss'
)
model.fit(X_train_scaled, y_train)
# ==================== EVALUATION ====================
print("\nπ Evaluating model...")
y_pred = model.predict(X_test_scaled)
y_pred_proba = model.predict_proba(X_test_scaled)[:, 1]
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
roc_auc = roc_auc_score(y_test, y_pred_proba)
print(f"""
Model Performance:
------------------
Accuracy: {accuracy:.3f}
Precision: {precision:.3f}
Recall: {recall:.3f}
F1 Score: {f1:.3f}
ROC AUC: {roc_auc:.3f}
""")
# Feature importance
importance_df = pd.DataFrame({
'feature': feature_cols,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nFeature Importance:")
print(importance_df)
# ==================== SAVE MODEL ====================
print("\nπΎ Saving model...")
os.makedirs('models', exist_ok=True)
joblib.dump(model, 'models/xgb_repurchase_model.joblib')
joblib.dump(scaler, 'models/scaler.joblib')
# Upload to S3
s3.upload_file('models/xgb_repurchase_model.joblib', BUCKET_NAME, 'models/xgb_repurchase_model.joblib')
s3.upload_file('models/scaler.joblib', BUCKET_NAME, 'models/scaler.joblib')
print("β
Model saved to S3!")
print(f"s3://{BUCKET_NAME}/models/xgb_repurchase_model.joblib")
# ==================== MAKE PREDICTIONS ====================
# Create predictions for all customers
df['repurchase_probability'] = model.predict_proba(scaler.transform(X))[:, 1]
# Save predictions
df[['customer_id', 'repurchase_probability']].to_csv('customer_predictions.csv', index=False)
s3.upload_file('customer_predictions.csv', BUCKET_NAME, 'ml-outputs/customer_predictions.csv')
print("\nπ― Predictions saved!")
print("Top 10 customers most likely to repurchase:")
print(df.nlargest(10, 'repurchase_probability')[['customer_id', 'total_spent', 'repurchase_probability']])
You've trained an ML model that predicts customer behavior! This model can now inform marketing campaigns.
When done, go to SageMaker Console β Notebook instances β Select your instance β Stop. This prevents unnecessary charges.
QuickSight is AWS's native BI toolβserverless, fast, and integrates seamlessly with Redshift. We'll build an executive dashboard showing:
omniretail-biomniretail-data-[YOUR-INITIALS]omniretail-workgroupdevomniretail-redshiftanalyticssales_summarySPICE (Super-fast, Parallel, In-memory Calculation Engine) is QuickSight's in-memory engine. It caches your data for instant dashboard loads.
Visual 1: Total Revenue KPI
total_sales (Sum)Visual 2: Revenue by Region (Bar Chart)
regiontotal_sales (Sum)Visual 3: Campaign Channel Performance (Pie Chart)
campaign_channeltotal_sales (Sum)analytics.campaign_performanceVisual 4: Top Campaigns by ROI (Table)
campaign_name, revenue_generated, budget, roiroi (Descending)roi green if > 50%, red if < 0%{
"fileLocations": [
{
"URIPrefixes": [
"s3://omniretail-data-[YOUR-INITIALS]/ml-outputs/customer_predictions.csv"
]
}
],
"globalUploadSettings": {
"format": "CSV",
"delimiter": ",",
"textqualifier": "\"",
"containsHeader": "true"
}
}
Visual 5: High-Value Customer Segments (Scatter Plot)
customer_ltvtotal_spentrepurchase_probabilitytotal_orderscustomer_segmentOmniRetail Executive DashboardYour executive dashboard is live! Stakeholders can now track KPIs in real-time.
Right now, you manually run Glue jobs. Let's automate this to run every night at 2 AM:
omniretail-etl-triggerReplace the default code with:
import boto3
import os
glue = boto3.client('glue')
def lambda_handler(event, context):
"""
Triggers Glue ETL jobs sequentially:
1. Bronze β Silver
2. Silver β Gold
"""
try:
print("π Starting ETL pipeline automation...")
# Trigger Bronze β Silver
print("Triggering bronze_to_silver job...")
silver_response = glue.start_job_run(JobName='bronze_to_silver')
silver_run_id = silver_response['JobRunId']
print(f"β
bronze_to_silver started: {silver_run_id}")
# Wait for Silver job to complete (simplified - production would poll status)
print("Waiting for Silver job to complete...")
# Trigger Silver β Gold
print("Triggering silver_to_gold job...")
gold_response = glue.start_job_run(JobName='silver_to_gold')
gold_run_id = gold_response['JobRunId']
print(f"β
silver_to_gold started: {gold_run_id}")
return {
'statusCode': 200,
'body': f'ETL jobs triggered: Silver={silver_run_id}, Gold={gold_run_id}'
}
except Exception as e:
print(f"β Error: {str(e)}")
return {
'statusCode': 500,
'body': f'Error triggering jobs: {str(e)}'
}
AWSGlueConsoleFullAccesstest-eventomniretail-nightly-etl0 2 * * ? * (2 AM UTC daily)omniretail-etl-trigger0 2 * * ? * = Every day at 2:00 AM UTC0 */6 * * ? * = Every 6 hours0 0 ? * MON * = Every Monday at midnight# View Lambda logs
aws logs tail /aws/lambda/omniretail-etl-trigger --follow
# Check Glue job runs
aws glue get-job-runs --job-name bronze_to_silver --max-results 5
Your pipeline is now fully automated! Fresh data flows through Bronze β Silver β Gold every night without manual intervention.
| Service | Usage | Cost/Month |
|---|---|---|
| S3 | 5 GB storage + requests | $0.12 |
| AWS Glue | 2 jobs Γ 5 min/day Γ 30 days | ~$10 |
| Redshift Serverless | 8 RPUs Γ 10 hours/month | $2.50 |
| SageMaker | ml.t3.medium Γ 5 hours | $0.25 |
| Athena | 100 queries Γ 10 MB scanned | $0.05 |
| Lambda | 30 invocations | $0 (free tier) |
| QuickSight | 1 user | $9 |
| CloudWatch | Log storage | $0.50 |
| TOTAL | ~$22-30/month |
With $100 credits: You can run this project for 3-4 months risk-free!
$5080% ($40) and 100% ($50)Cause: IAM role lacks S3 permissions
Solution:
# Check role has S3 access
aws iam list-attached-role-policies --role-name AWSGlueServiceRole-OmniRetail
# If missing, attach S3 policy
aws iam attach-role-policy \
--role-name AWSGlueServiceRole-OmniRetail \
--policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess
Cause: IAM role not attached or incorrect ARN
Solution:
Cause: Schema mismatch or corrupt files
Solution:
-- Drop and recreate table
DROP TABLE omniretail.silver_customers;
-- Create with explicit schema (don't rely on inference)
CREATE EXTERNAL TABLE omniretail.silver_customers (...)
STORED AS PARQUET
LOCATION 's3://omniretail-data-XXX/silver/crm_customers/';
Cause: Default 3-second timeout too short
Solution:
30 seconds256 MB if neededCause: Network/VPC configuration
Solution:
Cause: Default 5GB volume too small
Solution:
20 GB[YOUR-INITIALS] in scripts?You now have hands-on experience with:
This guide was created to help you master AWS data engineering.
If you found it helpful, consider starring this project or sharing it with others!
Built by Vincent Forca | 2025