Data Science Basics: Big Data Analytics, ML Pipelines, ETL & Visualization
This article is a comprehensive introduction to Data Science Basics – including Big Data Analytics, Machine Learning Pipelines, ETL processes and Data Visualization with practical examples.
In a Nutshell
Data Science combines statistics, programming and domain knowledge to gain insights from data. Big Data processes large volumes of data, ML pipelines automate models, ETL prepares data, visualization makes results visible.
Compact Technical Description
Data Science is an interdisciplinary field that uses scientific methods, processes, algorithms and systems to extract knowledge and insights from structured and unstructured data.
Core Areas:
Big Data Analytics
- Concept: Processing extremely large volumes of data
- 3V Model: Volume, Velocity, Variety
- Technologies: Hadoop, Spark, NoSQL databases
- Applications: Real-time analysis, Predictive Analytics
Machine Learning Pipelines
- Concept: Automated ML workflow orchestration
- Phases: Data Collection → Preprocessing → Training → Evaluation → Deployment
- Tools: Scikit-learn, TensorFlow, MLflow, Airflow
- MLOps: Versioning, Monitoring, Retraining
ETL Processes
- Extract: Extract data from various sources
- Transform: Clean and transform data
- Load: Load data into target systems
- Tools: Apache NiFi, Talend, AWS Glue
Data Visualization
- Concept: Visual representation of data and insights
- Chart Types: Bar, Line, Scatter, Heatmap, Treemap
- Tools: Matplotlib, Seaborn, Plotly, Tableau
- Principles: Clarity, Accuracy, Efficiency
Exam-Relevant Key Points
- Data Science: Interdisciplinary field for data analysis
- Big Data: Large, fast, diverse data volumes
- Machine Learning: Automated pattern recognition in data
- ETL: Extract-Transform-Load for data preparation
- Visualization: Visual data representation
- Pipelines: Automated data processing workflows
- Analytics: Statistical and exploratory data analysis
- IHK-relevant: Modern data analysis and Business Intelligence
Core Components
- Data Collection: Data sources and collection
- Data Storage: Storage and organization
- Data Processing: Cleaning and transformation
- Data Analysis: Statistical and exploratory analysis
- Machine Learning: Models and algorithms
- Data Visualization: Visual representation
- Model Deployment: Production deployment
- Monitoring: Performance and drift detection
Practical Examples
1. Big Data Processing with Apache Spark and Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
# Big Data Processing Demo
class BigDataAnalytics:
def __init__(self, app_name="BigDataAnalytics"):
"""Initialize Spark Session"""
self.spark = SparkSession.builder \
.appName(app_name) \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
.getOrCreate()
print(f"Spark Session created: {self.spark.version}")
def create_sample_data(self):
"""Create sample data for Big Data demo"""
import random
from datetime import datetime, timedelta
# Simulated sales data
data = []
products = ["Laptop", "Smartphone", "Tablet", "Headphones", "Mouse", "Keyboard"]
regions = ["North", "South", "East", "West", "Central"]
start_date = datetime(2023, 1, 1)
for i in range(1000000): # 1M records
date = start_date + timedelta(days=random.randint(0, 365))
record = {
"transaction_id": f"T{i:06d}",
"date": date.strftime("%Y-%m-%d"),
"product": random.choice(products),
"region": random.choice(regions),
"quantity": random.randint(1, 10),
"unit_price": round(random.uniform(10, 1000), 2),
"customer_id": f"C{random.randint(1, 50000):05d}",
"sales_rep": f"SR{random.randint(1, 100):03d}"
}
record["total_amount"] = record["quantity"] * record["unit_price"]
data.append(record)
# Create Spark DataFrame
schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("date", StringType(), True),
StructField("product", StringType(), True),
StructField("region", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("unit_price", DoubleType(), True),
StructField("customer_id", StringType(), True),
StructField("sales_rep", StringType(), True),
StructField("total_amount", DoubleType(), True)
])
df = self.spark.createDataFrame(data, schema)
print(f"Sample data created: {df.count()} rows")
return df
def basic_analytics(self, df):
"""Perform basic analytics"""
print("=== Basic Analytics ===")
# Display schema
print("Schema:")
df.printSchema()
# Statistical summary
print("\nStatistical Summary:")
df.describe().show()
# Top products by revenue
print("\nTop Products by Revenue:")
product_sales = df.groupBy("product") \
.agg(sum("total_amount").alias("total_sales"),
count("transaction_id").alias("transaction_count")) \
.orderBy(col("total_sales").desc())
product_sales.show()
# Regional analysis
print("\nRegional Sales Analysis:")
regional_sales = df.groupBy("region") \
.agg(sum("total_amount").alias("total_sales"),
avg("total_amount").alias("avg_transaction"),
count("transaction_id").alias("transaction_count")) \
.orderBy(col("total_sales").desc())
regional_sales.show()
# Time-based analysis
print("\nMonthly Revenue Development:")
monthly_sales = df.withColumn("month", substring(col("date"), 1, 7)) \
.groupBy("month") \
.agg(sum("total_amount").alias("monthly_sales"),
count("transaction_id").alias("transaction_count")) \
.orderBy("month")
monthly_sales.show()
return product_sales, regional_sales, monthly_sales
def advanced_analytics(self, df):
"""Advanced analytics with Window Functions"""
print("\n=== Advanced Analytics ===")
# Window Functions for running totals
from pyspark.sql.window import Window
# Monthly cumulative revenue
window_spec = Window.partitionBy("month").orderBy("date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
monthly_cumulative = df.withColumn("month", substring(col("date"), 1, 7)) \
.groupBy("month", "date") \
.agg(sum("total_amount").alias("daily_sales")) \
.withColumn("cumulative_sales", sum("daily_sales").over(window_spec)) \
.orderBy("month", "date")
print("Monthly cumulative revenue:")
monthly_cumulative.show(20)
# Best-Performing Sales Reps
rep_performance = df.groupBy("sales_rep") \
.agg(sum("total_amount").alias("total_sales"),
count("transaction_id").alias("transaction_count"),
avg("total_amount").alias("avg_transaction")) \
.withColumn("performance_rank", percent_rank().over(Window.orderBy(col("total_sales").desc()))) \
.filter(col("performance_rank") <= 0.1) # Top 10%
print("\nTop 10% Sales Reps:")
rep_performance.show()
# Product correlations
product_correlation = df.groupBy("customer_id", "product") \
.agg(sum("quantity").alias("product_quantity")) \
.groupBy("customer_id") \
.pivot("product") \
.agg(sum("product_quantity")) \
.fillna(0)
print("\nProduct Correlation Matrix (Top 10 Customers):")
product_correlation.limit(10).show()
return monthly_cumulative, rep_performance, product_correlation
def machine_learning_pipeline(self, df):
"""Machine Learning Pipeline for revenue forecasting"""
print("\n=== Machine Learning Pipeline ===")
# Prepare features for ML
feature_data = df.groupBy("product", "region", "month") \
.agg(sum("total_amount").alias("target_sales"),
count("transaction_id").alias("transaction_count"),
avg("unit_price").alias("avg_unit_price"),
sum("quantity").alias("total_quantity"))
# Encode categorical features
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# Product Indexer
product_indexer = StringIndexer(inputCol="product", outputCol="product_index")
product_indexed = product_indexer.fit(feature_data).transform(feature_data)
# Region Indexer
region_indexer = StringIndexer(inputCol="region", outputCol="region_index")
region_indexed = region_indexer.fit(product_indexed).transform(product_indexed)
# One-Hot Encoding
encoder = OneHotEncoder(inputCols=["product_index", "region_index"],
outputCols=["product_encoded", "region_encoded"])
encoded_data = encoder.fit(region_indexed).transform(region_indexed)
# Create feature vector
assembler = VectorAssembler(
inputCols=["transaction_count", "avg_unit_price", "total_quantity",
"product_encoded", "region_encoded"],
outputCol="features"
)
assembled_data = assembler.transform(encoded_data)
# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)
# Train-Test Split
train_data, test_data = scaled_data.randomSplit([0.8, 0.2], seed=42)
print(f"Training data: {train_data.count()} rows")
print(f"Test data: {test_data.count()} rows")
# Linear Regression
lr = LinearRegression(featuresCol="scaled_features",
labelCol="target_sales",
predictionCol="predicted_sales")
lr_model = lr.fit(train_data)
# Make predictions on test data
predictions = lr_model.transform(test_data)
# Evaluate model
evaluator = RegressionEvaluator(labelCol="target_sales",
predictionCol="predicted_sales",
metricName="rmse")
rmse = evaluator.evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print(f"Model Evaluation:")
print(f"RMSE: {rmse:.2f}")
print(f"R²: {r2:.4f}")
# Display feature importance
print(f"\nFeature Coefficients:")
feature_names = ["transaction_count", "avg_unit_price", "total_quantity",
"product_encoded", "region_encoded"]
for i, (name, coef) in enumerate(zip(feature_names, lr_model.coefficients)):
print(f"{name}: {coef:.4f}")
return lr_model, predictions, scaler_model
def real_time_processing_simulation(self, df):
"""Simulate real-time processing"""
print("\n=== Real-Time Processing Simulation ===")
# Simulate streaming DataFrame
# In practice, this would come from Kafka, Kinesis etc.
# Aggregations for real-time monitoring
real_time_metrics = df.groupBy("product", "region") \
.agg(count("transaction_id").alias("transaction_count"),
sum("total_amount").alias("total_sales"),
avg("total_amount").alias("avg_transaction")) \
.orderBy(col("total_sales").desc())
print("Real-Time Metrics:")
real_time_metrics.show()
# Anomaly detection (simplified)
from pyspark.sql.functions import stddev
# Statistical bounds for anomaly detection
stats = df.groupBy("product") \
.agg(avg("total_amount").alias("avg_amount"),
stddev("total_amount").alias("std_amount")) \
.withColumn("upper_bound", col("avg_amount") + 2 * col("std_amount")) \
.withColumn("lower_bound", col("avg_amount") - 2 * col("std_amount"))
print("Anomaly bounds per product:")
stats.show()
return real_time_metrics, stats
def data_export(self, df, predictions):
"""Export data for further processing"""
print("\n=== Data Export ===")
# Export results in various formats
output_path = "/tmp/big_data_results"
# Analytics results
product_sales = df.groupBy("product") \
.agg(sum("total_amount").alias("total_sales")) \
.orderBy(col("total_sales").desc())
product_sales.write.mode("overwrite").parquet(f"{output_path}/product_sales")
print(f"Product sales exported to: {output_path}/product_sales")
# ML predictions
predictions.select("product", "region", "target_sales", "predicted_sales") \
.write.mode("overwrite").parquet(f"{output_path}/predictions")
print(f"ML predictions exported to: {output_path}/predictions")
# Summary statistics
summary_stats = df.agg(
count("transaction_id").alias("total_transactions"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_transaction_value"),
countDistinct("customer_id").alias("unique_customers"),
countDistinct("product").alias("unique_products")
)
summary_stats.write.mode("overwrite").json(f"{output_path}/summary_stats")
print(f"Summary statistics exported to: {output_path}/summary_stats")
def cleanup(self):
"""Free resources"""
self.spark.stop()
print("Spark Session terminated")
# Run Big Data demo
def big_data_demo():
analytics = BigDataAnalytics()
try:
# Create data
df = analytics.create_sample_data()
# Basic analytics
product_sales, regional_sales, monthly_sales = analytics.basic_analytics(df)
# Advanced analytics
monthly_cumulative, rep_performance, product_correlation = analytics.advanced_analytics(df)
# Machine Learning Pipeline
lr_model, predictions, scaler_model = analytics.machine_learning_pipeline(df)
# Real-time processing
real_time_metrics, stats = analytics.real_time_processing_simulation(df)
# Export data
analytics.data_export(df, predictions)
print("\nBig Data Analytics demo completed successfully!")
except Exception as e:
print(f"Error in Big Data demo: {e}")
finally:
analytics.cleanup()
if __name__ == "__main__":
big_data_demo()
2. ETL Pipeline with Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3UploadOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.models import Variable
from airflow.utils.dates import days_ago
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging
# ETL Pipeline Configuration
default_args = {
'owner': 'data-science-team',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG Definition
dag = DAG(
'data_science_etl_pipeline',
default_args=default_args,
description='ETL Pipeline for Data Science Analytics',
schedule_interval='@daily',
catchup=False,
tags=['data-science', 'etl', 'analytics'],
)
# Configure logging
logger = logging.getLogger(__name__)
def extract_customer_data(**context):
"""Extract customer data from various sources"""
logger.info("Extracting customer data...")
# PostgreSQL Hook
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
# Extract customer data from PostgreSQL
customer_query = """
SELECT
customer_id,
first_name,
last_name,
email,
phone,
registration_date,
last_login_date,
total_orders,
total_spent,
customer_segment
FROM customers
WHERE last_login_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
"""
customers_df = postgres_hook.get_pandas_df(customer_query)
logger.info(f"{len(customers_df)} customer records extracted")
# S3 Hook for additional data
s3_hook = S3Hook(aws_conn_id='aws_default')
# Product interaction data from S3
bucket_name = Variable.get('data_bucket_name', default_var='analytics-data')
try:
# Load CSV file from S3
file_content = s3_hook.read_key(key='customer_interactions.csv', bucket_name=bucket_name)
interactions_df = pd.read_csv(pd.StringIO(file_content))
logger.info(f"{len(interactions_df)} interaction records loaded from S3")
except Exception as e:
logger.warning(f"No interaction data found in S3: {e}")
interactions_df = pd.DataFrame()
# Combine data
if not interactions_df.empty:
combined_df = customers_df.merge(
interactions_df,
on='customer_id',
how='left'
)
else:
combined_df = customers_df
# Save data as JSON for next task
combined_json = combined_df.to_json(date_format='iso')
# Store in XCom
task_instance = context['task_instance']
task_instance.xcom_push(key='customer_data', value=combined_json)
logger.info("Customer data extraction completed")
return combined_json
def transform_customer_data(**context):
"""Transform and clean customer data"""
logger.info("Transforming customer data...")
# Get data from previous task
task_instance = context['task_instance']
customer_json = task_instance.xcom_pull(task_ids='extract_customer_data', key='customer_data')
# JSON to DataFrame
customers_df = pd.read_json(customer_json)
# Data Cleaning
logger.info("Performing data cleaning...")
# 1. Remove duplicates
original_count = len(customers_df)
customers_df = customers_df.drop_duplicates(subset=['customer_id'])
duplicates_removed = original_count - len(customers_df)
logger.info(f"{duplicates_removed} duplicates removed")
# 2. Handle missing values
# Numeric columns: impute with median
numeric_columns = ['total_orders', 'total_spent']
for col in numeric_columns:
if col in customers_df.columns:
median_value = customers_df[col].median()
customers_df[col].fillna(median_value, inplace=True)
# Categorical columns: impute with mode
categorical_columns = ['customer_segment']
for col in categorical_columns:
if col in customers_df.columns:
mode_value = customers_df[col].mode()[0] if not customers_df[col].mode().empty else 'Unknown'
customers_df[col].fillna(mode_value, inplace=True)
# 3. Normalize date columns
date_columns = ['registration_date', 'last_login_date']
for col in date_columns:
if col in customers_df.columns:
customers_df[col] = pd.to_datetime(customers_df[col], errors='coerce')
# 4. Feature Engineering
logger.info("Creating new features...")
# Calculate Customer Lifetime Value (CLV)
if 'total_spent' in customers_df.columns and 'total_orders' in customers_df.columns:
customers_df['avg_order_value'] = customers_df['total_spent'] / customers_df['total_orders']
customers_df['clv_score'] = customers_df['total_spent'] * np.log1p(customers_df['total_orders'])
# Recency Score (days since last login)
if 'last_login_date' in customers_df.columns:
current_date = datetime.now()
customers_df['days_since_last_login'] = (current_date - customers_df['last_login_date']).dt.days
customers_df['recency_score'] = pd.cut(customers_df['days_since_last_login'],
bins=[0, 7, 30, 90, float('inf')],
labels=[4, 3, 2, 1])
# Engagement Score based on various factors
engagement_features = []
if 'total_orders' in customers_df.columns:
engagement_features.append('total_orders')
if 'days_since_last_login' in customers_df.columns:
engagement_features.append('days_since_last_login')
if engagement_features:
# Normalized features for engagement score
for feature in engagement_features:
if feature in customers_df.columns:
min_val = customers_df[feature].min()
max_val = customers_df[feature].max()
if max_val != min_val:
customers_df[f'{feature}_normalized'] = (customers_df[feature] - min_val) / (max_val - min_val)
else:
customers_df[f'{feature}_normalized'] = 0
# Calculate engagement score
normalized_features = [f'{f}_normalized' for f in engagement_features]
customers_df['engagement_score'] = customers_df[normalized_features].mean(axis=1)
# 5. Data Validation
logger.info("Performing data validation...")
validation_errors = []
# Check business rules
if 'total_orders' in customers_df.columns:
if (customers_df['total_orders'] < 0).any():
validation_errors.append("Negative order quantities found")
if 'total_spent' in customers_df.columns:
if (customers_df['total_spent'] < 0).any():
validation_errors.append("Negative revenue found")
if 'email' in customers_df.columns:
invalid_emails = customers_df[~customers_df['email'].str.contains('@', na=False)]
if not invalid_emails.empty:
validation_errors.append(f"{len(invalid_emails)} invalid email addresses found")
if validation_errors:
logger.warning(f"Validation errors: {validation_errors}")
# 6. Aggregate data for analytics
logger.info("Aggregating data for analytics...")
# Customer segment statistics
if 'customer_segment' in customers_df.columns:
segment_stats = customers_df.groupby('customer_segment').agg({
'customer_id': 'count',
'total_spent': ['sum', 'mean'],
'total_orders': ['sum', 'mean'],
'engagement_score': 'mean'
}).round(2)
logger.info("Customer segment statistics created")
# Save transformed data
transformed_json = customers_df.to_json(date_format='iso')
task_instance.xcom_push(key='transformed_data', value=transformed_json)
logger.info(f"Data transformation completed: {len(customers_df)} records")
return transformed_json
def load_data_to_warehouse(**context):
"""Load transformed data to Data Warehouse"""
logger.info("Loading data to Data Warehouse...")
# Get transformed data
task_instance = context['task_instance']
transformed_json = task_instance.xcom_pull(task_ids='transform_customer_data', key='transformed_data')
# JSON to DataFrame
customers_df = pd.read_json(transformed_json)
# PostgreSQL Hook
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
# Create table if not exists
create_table_sql = """
CREATE TABLE IF NOT EXISTS analytics_customer_metrics (
customer_id VARCHAR(50) PRIMARY KEY,
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(255),
phone VARCHAR(50),
registration_date TIMESTAMP,
last_login_date TIMESTAMP,
total_orders INTEGER,
total_spent DECIMAL(10,2),
customer_segment VARCHAR(50),
avg_order_value DECIMAL(10,2),
clv_score DECIMAL(10,2),
days_since_last_login INTEGER,
recency_score INTEGER,
engagement_score DECIMAL(5,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
postgres_hook.run(create_table_sql)
# Upsert Operation (Insert or Update)
upsert_sql = """
INSERT INTO analytics_customer_metrics (
customer_id, first_name, last_name, email, phone,
registration_date, last_login_date, total_orders, total_spent,
customer_segment, avg_order_value, clv_score, days_since_last_login,
recency_score, engagement_score
) VALUES (
%(customer_id)s, %(first_name)s, %(last_name)s, %(email)s, %(phone)s,
%(registration_date)s, %(last_login_date)s, %(total_orders)s, %(total_spent)s,
%(customer_segment)s, %(avg_order_value)s, %(clv_score)s, %(days_since_last_login)s,
%(recency_score)s, %(engagement_score)s
)
ON CONFLICT (customer_id)
DO UPDATE SET
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
email = EXCLUDED.email,
phone = EXCLUDED.phone,
registration_date = EXCLUDED.registration_date,
last_login_date = EXCLUDED.last_login_date,
total_orders = EXCLUDED.total_orders,
total_spent = EXCLUDED.total_spent,
customer_segment = EXCLUDED.customer_segment,
avg_order_value = EXCLUDED.avg_order_value,
clv_score = EXCLUDED.clv_score,
days_since_last_login = EXCLUDED.days_since_last_login,
recency_score = EXCLUDED.recency_score,
engagement_score = EXCLUDED.engagement_score,
updated_at = CURRENT_TIMESTAMP;
"""
# Load data in batches (performance optimization)
batch_size = 1000
total_rows = len(customers_df)
for i in range(0, total_rows, batch_size):
batch_df = customers_df.iloc[i:i+batch_size]
# Prepare data for PostgreSQL
batch_records = batch_df.to_dict('records')
# Handle NaN values
for record in batch_records:
for key, value in record.items():
if pd.isna(value):
record[key] = None
elif isinstance(value, pd.Timestamp):
record[key] = value.isoformat()
# Execute batch
postgres_hook.run(upsert_sql, parameters=batch_records)
logger.info(f"Batch {i//batch_size + 1}/{(total_rows-1)//batch_size + 1} loaded: {len(batch_df)} rows")
# Also load to BigQuery for analytics
try:
bigquery_hook = BigQueryHook(gcp_conn_id='google_cloud_default', use_legacy_sql=False)
# Configure dataset and table
project_id = Variable.get('gcp_project_id', default_var='my-project')
dataset_id = 'analytics'
table_id = 'customer_metrics'
# Load DataFrame to BigQuery
customers_df.to_gbq(
destination_table=f'{dataset_id}.{table_id}',
project_id=project_id,
if_exists='replace',
progress_bar=False
)
logger.info("Data successfully loaded to BigQuery")
except Exception as e:
logger.warning(f"BigQuery load failed: {e}")
logger.info("Data load operation completed")
def generate_analytics_report(**context):
"""Generate analytics report"""
logger.info("Generating analytics report...")
# PostgreSQL Hook for final data
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
# Execute analytics queries
analytics_queries = {
'customer_segments': """
SELECT
customer_segment,
COUNT(*) as customer_count,
AVG(total_spent) as avg_total_spent,
AVG(total_orders) as avg_total_orders,
AVG(clv_score) as avg_clv_score,
AVG(engagement_score) as avg_engagement_score
FROM analytics_customer_metrics
GROUP BY customer_segment
ORDER BY avg_total_spent DESC
""",
'top_customers': """
SELECT
customer_id,
first_name,
last_name,
total_spent,
total_orders,
clv_score,
engagement_score
FROM analytics_customer_metrics
ORDER BY clv_score DESC
LIMIT 10
""",
'engagement_distribution': """
SELECT
CASE
WHEN engagement_score >= 0.8 THEN 'High'
WHEN engagement_score >= 0.6 THEN 'Medium'
WHEN engagement_score >= 0.4 THEN 'Low'
ELSE 'Very Low'
END as engagement_level,
COUNT(*) as customer_count,
AVG(total_spent) as avg_total_spent
FROM analytics_customer_metrics
GROUP BY engagement_level
ORDER BY avg_total_spent DESC
""",
'daily_trends': """
SELECT
DATE(last_login_date) as login_date,
COUNT(*) as active_customers,
AVG(total_spent) as avg_spend_per_customer
FROM analytics_customer_metrics
WHERE last_login_date >= CURRENT_DATE - INTERVAL 30 DAY
GROUP BY DATE(last_login_date)
ORDER BY login_date DESC
"""
}
# Collect results
analytics_results = {}
for query_name, query in analytics_queries.items():
try:
result_df = postgres_hook.get_pandas_df(query)
analytics_results[query_name] = result_df
logger.info(f"Analytics query '{query_name}' executed: {len(result_df)} rows")
except Exception as e:
logger.error(f"Error in query '{query_name}': {e}")
analytics_results[query_name] = pd.DataFrame()
# Create report
report_data = {
'generated_at': datetime.now().isoformat(),
'total_customers': len(analytics_results.get('customer_segments', pd.DataFrame())),
'analytics': {}
}
for query_name, result_df in analytics_results.items():
if not result_df.empty:
report_data['analytics'][query_name] = result_df.to_dict('records')
# Save report as JSON
report_json = json.dumps(report_data, indent=2, default=str)
# Save report to S3
try:
s3_hook = S3Hook(aws_conn_id='aws_default')
bucket_name = Variable.get('reports_bucket_name', default_var='analytics-reports')
report_filename = f"customer_analytics_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
s3_hook.load_string(
report_json,
key=report_filename,
bucket_name=bucket_name,
replace=True
)
logger.info(f"Analytics report saved: s3://{bucket_name}/{report_filename}")
except Exception as e:
logger.error(f"Error saving report: {e}")
# Log report metrics
if 'customer_segments' in analytics_results:
segments_df = analytics_results['customer_segments']
if not segments_df.empty:
logger.info("Customer segment distribution:")
for _, row in segments_df.iterrows():
logger.info(f" {row['customer_segment']}: {row['customer_count']} customers, "
f"Ø revenue: €{row['avg_total_spent']:.2f}")
logger.info("Analytics report generation completed")
# Define tasks
extract_task = PythonOperator(
task_id='extract_customer_data',
python_callable=extract_customer_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_customer_data',
python_callable=transform_customer_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load_data_to_warehouse',
python_callable=load_data_to_warehouse,
dag=dag,
)
report_task = PythonOperator(
task_id='generate_analytics_report',
python_callable=generate_analytics_report,
dag=dag,
)
# Task Dependencies
extract_task >> transform_task >> load_task >> report_task
3. Data Visualization with Matplotlib, Seaborn and Plotly
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# Data Visualization Demo
class DataVisualizationDemo:
def __init__(self):
"""Set style and configuration"""
# Matplotlib configuration
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 10
plt.rcParams['axes.titlesize'] = 14
plt.rcParams['axes.labelsize'] = 12
plt.rcParams['xtick.labelsize'] = 10
plt.rcParams['ytick.labelsize'] = 10
plt.rcParams['legend.fontsize'] = 10
plt.rcParams['figure.titlesize'] = 16
# Seaborn configuration
sns.set_palette("husl")
# Define colors
self.colors = {
'primary': '#2E86AB',
'secondary': '#A23B72',
'accent': '#F18F01',
'success': '#C73E1D',
'warning': '#F4A261',
'info': '#264653',
'light': '#E9C46A',
'dark': '#2A9D8F'
}
print("Data Visualization Demo initialized")
def create_sample_data(self):
"""Create sample data for visualization"""
np.random.seed(42)
# Time series data (revenue development)
dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
base_revenue = 10000
trend = np.linspace(0, 5000, len(dates))
seasonal = 2000 * np.sin(2 * np.pi * np.arange(len(dates)) / 365.25)
noise = np.random.normal(0, 500, len(dates))
revenue_data = pd.DataFrame({
'date': dates,
'revenue': base_revenue + trend + seasonal + noise,
'orders': np.random.poisson(100, len(dates)) + 50,
'customers': np.random.poisson(80, len(dates)) + 30
})
# Customer data
customer_data = pd.DataFrame({
'customer_id': range(1000),
'age': np.random.normal(35, 10, 1000),
'income': np.random.lognormal(10.5, 0.5, 1000),
'total_spent': np.random.gamma(2, 100, 1000),
'orders': np.random.poisson(5, 1000) + 1,
'segment': np.random.choice(['Bronze', 'Silver', 'Gold', 'Platinum'], 1000,
p=[0.4, 0.3, 0.2, 0.1]),
'registration_date': pd.date_range('2020-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)],
'last_purchase': pd.date_range('2023-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)]
})
# Product data
products = ['Laptop', 'Smartphone', 'Tablet', 'Headphones', 'Mouse', 'Keyboard', 'Monitor', 'Webcam']
product_data = pd.DataFrame({
'product': np.random.choice(products, 5000),
'category': np.random.choice(['Electronics', 'Accessories'], 5000),
'price': np.random.uniform(10, 1000, 5000),
'quantity_sold': np.random.poisson(10, 5000) + 1,
'rating': np.random.uniform(3, 5, 5000),
'month': np.random.choice(range(1, 13), 5000)
})
# Geographic data
regions = ['North', 'South', 'East', 'West', 'Central']
geo_data = pd.DataFrame({
'region': np.random.choice(regions, 1000),
'city': [f"{region} City {i}" for region, i in zip(
np.random.choice(regions, 1000),
np.random.randint(1, 10, 1000)
)],
'population': np.random.lognormal(10, 1, 1000),
'revenue': np.random.gamma(2, 50000, 1000),
'stores': np.random.poisson(5, 1000) + 1
})
return revenue_data, customer_data, product_data, geo_data
def create_time_series_visualizations(self, revenue_data):
"""Create time series visualizations"""
print("Creating time series visualizations...")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Revenue Analysis Time Series', fontsize=16, fontweight='bold')
# 1. Revenue development
axes[0, 0].plot(revenue_data['date'], revenue_data['revenue'],
color=self.colors['primary'], linewidth=2)
axes[0, 0].set_title('Daily Revenue Development')
axes[0, 0].set_xlabel('Date')
axes[0, 0].set_ylabel('Revenue (€)')
axes[0, 0].grid(True, alpha=0.3)
# Add moving average
revenue_data['ma_7'] = revenue_data['revenue'].rolling(window=7).mean()
revenue_data['ma_30'] = revenue_data['revenue'].rolling(window=30).mean()
axes[0, 0].plot(revenue_data['date'], revenue_data['ma_7'],
color=self.colors['accent'], linewidth=1, alpha=0.7, label='7-Day MA')
axes[0, 0].plot(revenue_data['date'], revenue_data['ma_30'],
color=self.colors['secondary'], linewidth=1, alpha=0.7, label='30-Day MA')
axes[0, 0].legend()
# 2. Monthly revenue
monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
axes[0, 1].bar(monthly_revenue.index, monthly_revenue.values,
color=self.colors['info'], alpha=0.7)
axes[0, 1].set_title('Monthly Total Revenue')
axes[0, 1].set_xlabel('Month')
axes[0, 1].set_ylabel('Revenue (€)')
axes[0, 1].tick_params(axis='x', rotation=45)
# 3. Revenue vs Orders Scatter Plot
axes[1, 0].scatter(revenue_data['orders'], revenue_data['revenue'],
alpha=0.6, color=self.colors['primary'])
axes[1, 0].set_title('Revenue vs. Orders')
axes[1, 0].set_xlabel('Number of Orders')
axes[1, 0].set_ylabel('Revenue (€)')
# Add trend line
z = np.polyfit(revenue_data['orders'], revenue_data['revenue'], 1)
p = np.poly1d(z)
axes[1, 0].plot(revenue_data['orders'], p(revenue_data['orders']),
color=self.colors['accent'], linewidth=2)
# 4. Distribution of daily revenue
axes[1, 1].hist(revenue_data['revenue'], bins=30, color=self.colors['secondary'],
alpha=0.7, edgecolor='black')
axes[1, 1].set_title('Distribution of Daily Revenue')
axes[1, 1].set_xlabel('Revenue (€)')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].axvline(revenue_data['revenue'].mean(), color=self.colors['accent'],
linestyle='--', linewidth=2, label=f'Mean: €{revenue_data["revenue"].mean():.0f}')
axes[1, 1].legend()
plt.tight_layout()
plt.savefig('time_series_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_customer_analytics_visualizations(self, customer_data):
"""Create customer analytics visualizations"""
print("Creating customer analytics visualizations...")
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Customer Analytics Dashboard', fontsize=16, fontweight='bold')
# 1. Age distribution
axes[0, 0].hist(customer_data['age'], bins=20, color=self.colors['primary'],
alpha=0.7, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Count')
# 2. Income distribution
axes[0, 1].hist(customer_data['income'], bins=30, color=self.colors['secondary'],
alpha=0.7, edgecolor='black')
axes[0, 1].set_title('Income Distribution')
axes[0, 1].set_xlabel('Income (€)')
axes[0, 1].set_ylabel('Count')
# 3. Customer segment distribution
segment_counts = customer_data['segment'].value_counts()
axes[0, 2].pie(segment_counts.values, labels=segment_counts.index,
autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']])
axes[0, 2].set_title('Customer Segment Distribution')
# 4. Revenue by segment
segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
bars = axes[1, 0].bar(segment_revenue.index, segment_revenue.values,
color=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']])
axes[1, 0].set_title('Average Revenue by Segment')
axes[1, 0].set_xlabel('Segment')
axes[1, 0].set_ylabel('Ø Revenue (€)')
# Display values on bars
for bar, value in zip(bars, segment_revenue.values):
axes[1, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 100,
f'€{value:.0f}', ha='center', va='bottom')
# 5. Age vs. Total spending
scatter = axes[1, 1].scatter(customer_data['age'], customer_data['total_spent'],
alpha=0.6, c=customer_data['orders'], cmap='viridis')
axes[1, 1].set_title('Age vs. Total Spending')
axes[1, 1].set_xlabel('Age')
axes[1, 1].set_ylabel('Total Spending (€)')
plt.colorbar(scatter, ax=axes[1, 1], label='Number of Orders')
# 6. Boxplot for revenue by segment
segments = customer_data['segment'].unique()
data_for_boxplot = [customer_data[customer_data['segment'] == seg]['total_spent']
for seg in segments]
box_plot = axes[1, 2].boxplot(data_for_boxplot, labels=segments, patch_artist=True)
colors = [self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']]
for patch, color in zip(box_plot['boxes'], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
axes[1, 2].set_title('Revenue Distribution by Segment')
axes[1, 2].set_xlabel('Segment')
axes[1, 2].set_ylabel('Total Spending (€)')
plt.tight_layout()
plt.savefig('customer_analytics.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_product_analytics_visualizations(self, product_data):
"""Create product analytics visualizations"""
print("Creating product analytics visualizations...")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Product Analytics Dashboard', fontsize=16, fontweight='bold')
# 1. Top products by revenue
product_revenue = product_data.groupby('product').apply(
lambda x: (x['price'] * x['quantity_sold']).sum()
).sort_values(ascending=False)
bars = axes[0, 0].barh(product_revenue.index, product_revenue.values,
color=self.colors['primary'])
axes[0, 0].set_title('Top Products by Total Revenue')
axes[0, 0].set_xlabel('Total Revenue (€)')
# Display values on bars
for i, (bar, value) in enumerate(zip(bars, product_revenue.values)):
axes[0, 0].text(value + max(product_revenue.values) * 0.01, i,
f'€{value:,.0f}', va='center')
# 2. Price distribution by category
categories = product_data['category'].unique()
for i, category in enumerate(categories):
category_data = product_data[product_data['category'] == category]['price']
axes[0, 1].hist(category_data, bins=20, alpha=0.7, label=category,
color=self.colors['primary'] if i == 0 else self.colors['secondary'])
axes[0, 1].set_title('Price Distribution by Category')
axes[0, 1].set_xlabel('Price (€)')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].legend()
# 3. Rating vs. Price
axes[1, 0].scatter(product_data['price'], product_data['rating'],
alpha=0.6, color=self.colors['accent'])
axes[1, 0].set_title('Rating vs. Price')
axes[1, 0].set_xlabel('Price (€)')
axes[1, 0].set_ylabel('Rating')
# Correlation coefficient
correlation = np.corrcoef(product_data['price'], product_data['rating'])[0, 1]
axes[1, 0].text(0.05, 0.95, f'Correlation: {correlation:.3f}',
transform=axes[1, 0].transAxes, va='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
# 4. Monthly sales
monthly_sales = product_data.groupby('month')['quantity_sold'].sum()
month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
axes[1, 1].bar(monthly_sales.index, monthly_sales.values,
color=self.colors['info'], alpha=0.7)
axes[1, 1].set_title('Monthly Sales Quantities')
axes[1, 1].set_xlabel('Month')
axes[1, 1].set_ylabel('Sales Quantity')
axes[1, 1].set_xticks(range(1, 13))
axes[1, 1].set_xticklabels(month_names, rotation=45)
plt.tight_layout()
plt.savefig('product_analytics.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_geographic_visualizations(self, geo_data):
"""Create geographic visualizations"""
print("Creating geographic visualizations...")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Geographic Analysis Dashboard', fontsize=16, fontweight='bold')
# 1. Revenue by region
region_revenue = geo_data.groupby('region')['revenue'].sum().sort_values(ascending=False)
bars = axes[0, 0].bar(region_revenue.index, region_revenue.values,
color=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success'], self.colors['info'])
axes[0, 0].set_title('Total Revenue by Region')
axes[0, 0].set_ylabel('Revenue (€)')
# Display values on bars
for bar, value in zip(bars, region_revenue.values):
axes[0, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_revenue.values) * 0.01,
f'€{value:,.0f}', ha='center', va='bottom')
# 2. Population vs. Revenue
axes[0, 1].scatter(geo_data['population'], geo_data['revenue'],
alpha=0.6, s=geo_data['stores']*10,
c=self.colors['primary'])
axes[0, 1].set_title('Population vs. Revenue')
axes[0, 1].set_xlabel('Population')
axes[0, 1].set_ylabel('Revenue (€)')
# 3. Store density by region
region_stores = geo_data.groupby('region')['stores'].sum()
axes[1, 0].pie(region_stores.values, labels=region_stores.index,
autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success'], self.colors['info']])
axes[1, 0].set_title('Store Distribution by Region')
# 4. Revenue per store
region_avg_revenue = geo_data.groupby('region').apply(
lambda x: x['revenue'].sum() / x['stores'].sum()
).sort_values(ascending=False)
bars = axes[1, 1].bar(region_avg_revenue.index, region_avg_revenue.values,
color=self.colors['secondary'])
axes[1, 1].set_title('Average Revenue per Store')
axes[1, 1].set_ylabel('Ø Revenue per Store (€)')
# Display values on bars
for bar, value in zip(bars, region_avg_revenue.values):
axes[1, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_avg_revenue.values) * 0.01,
f'€{value:,.0f}', ha='center', va='bottom')
plt.tight_layout()
plt.savefig('geographic_analytics.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_interactive_visualizations(self, revenue_data, customer_data, product_data):
"""Create interactive visualizations with Plotly"""
print("Creating interactive visualizations...")
# 1. Interactive time series
fig1 = make_subplots(
rows=2, cols=1,
subplot_titles=('Daily Revenue', 'Monthly Revenue'),
vertical_spacing=0.1
)
# Daily revenue with moving averages
fig1.add_trace(
go.Scatter(
x=revenue_data['date'],
y=revenue_data['revenue'],
mode='lines',
name='Daily Revenue',
line=dict(color='blue', width=1)
),
row=1, col=1
)
fig1.add_trace(
go.Scatter(
x=revenue_data['date'],
y=revenue_data['ma_7'],
mode='lines',
name='7-Day MA',
line=dict(color='orange', width=2)
),
row=1, col=1
)
# Monthly revenue
monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
fig1.add_trace(
go.Bar(
x=monthly_revenue.index,
y=monthly_revenue.values,
name='Monthly Revenue',
marker_color='lightblue'
),
row=2, col=1
)
fig1.update_layout(
title='Interactive Revenue Analysis',
height=600,
showlegend=True
)
fig1.show()
# 2. Interactive customer analysis
fig2 = make_subplots(
rows=2, cols=2,
subplot_titles=('Age Distribution', 'Income Distribution',
'Revenue by Segment', 'Age vs. Revenue'),
specs=[[{"type": "histogram"}, {"type": "histogram"}],
[{"type": "bar"}, {"type": "scatter"}]]
)
# Age distribution
fig2.add_trace(
go.Histogram(
x=customer_data['age'],
nbinsx=20,
name='Age',
marker_color='lightgreen'
),
row=1, col=1
)
# Income distribution
fig2.add_trace(
go.Histogram(
x=customer_data['income'],
nbinsx=30,
name='Income',
marker_color='lightcoral'
),
row=1, col=2
)
# Revenue by segment
segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
fig2.add_trace(
go.Bar(
x=segment_revenue.index,
y=segment_revenue.values,
name='Ø Revenue',
marker_color='lightblue'
),
row=2, col=1
)
# Age vs. Revenue
fig2.add_trace(
go.Scatter(
x=customer_data['age'],
y=customer_data['total_spent'],
mode='markers',
name='Customers',
marker=dict(
size=8,
color=customer_data['orders'],
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Orders")
)
),
row=2, col=2
)
fig2.update_layout(
title='Interactive Customer Analysis',
height=600,
showlegend=False
)
fig2.show()
# 3. Interactive 3D visualization
fig3 = go.Figure(data=[go.Scatter3d(
x=customer_data['age'],
y=customer_data['income'],
z=customer_data['total_spent'],
mode='markers',
marker=dict(
size=5,
color=customer_data['orders'],
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Orders")
),
text=[f"Customer {i}<br>Age: {age}<br>Income: {inc:,.0f}€<br>Revenue: {spent:,.0f}€"
for i, (age, inc, spent) in enumerate(zip(customer_data['age'],
customer_data['income'],
customer_data['total_spent']))],
hovertemplate='%{text}<extra></extra>'
)])
fig3.update_layout(
title='3D Customer Analysis (Age, Income, Revenue)',
scene=dict(
xaxis_title='Age',
yaxis_title='Income (€)',
zaxis_title='Revenue (€)'
),
height=600
)
fig3.show()
return fig1, fig2, fig3
def create_dashboard_summary(self, revenue_data, customer_data, product_data, geo_data):
"""Create summary dashboard"""
print("Creating summary dashboard...")
# Calculate key metrics
total_revenue = revenue_data['revenue'].sum()
avg_daily_revenue = revenue_data['revenue'].mean()
total_customers = len(customer_data)
avg_customer_value = customer_data['total_spent'].mean()
total_orders = customer_data['orders'].sum()
top_product = product_data.groupby('product').apply(
lambda x: (x['price'] * x['quantity_sold']).sum()
).idxmax()
# Create dashboard
fig = plt.figure(figsize=(20, 16))
# Title
fig.suptitle('Data Science Analytics Dashboard', fontsize=20, fontweight='bold')
# Key metrics
metrics_text = f"""
KEY METRICS
─────────────────────
Total Revenue: €{total_revenue:,.0f}
Ø Daily Revenue: €{avg_daily_revenue:,.0f}
Total Customers: {total_customers:,}
Ø Customer Value: €{avg_customer_value:,.0f}
Total Orders: {total_orders:,}
Top Product: {top_product}
"""
plt.figtext(0.02, 0.95, metrics_text, fontsize=12, fontfamily='monospace',
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
# Subplots for various analyses
gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3,
left=0.15, right=0.95, top=0.85, bottom=0.05)
# 1. Revenue trend (top left)
ax1 = fig.add_subplot(gs[0, 0])
monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
ax1.plot(monthly_revenue.index, monthly_revenue.values,
color=self.colors['primary'], linewidth=2)
ax1.set_title('Monthly Revenue Trend')
ax1.tick_params(axis='x', rotation=45)
# 2. Customer segments (top center)
ax2 = fig.add_subplot(gs[0, 1])
segment_counts = customer_data['segment'].value_counts()
ax2.pie(segment_counts.values, labels=segment_counts.index, autopct='%1.1f%%',
colors=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']])
ax2.set_title('Customer Segments')
# 3. Top products (top right)
ax3 = fig.add_subplot(gs[0, 2])
product_revenue = product_data.groupby('product').apply(
lambda x: (x['price'] * x['quantity_sold']).sum()
).sort_values(ascending=False).head(5)
bars = ax3.barh(range(len(product_revenue)), product_revenue.values,
color=self.colors['info'])
ax3.set_yticks(range(len(product_revenue)))
ax3.set_yticklabels(product_revenue.index)
ax3.set_title('Top 5 Products')
ax3.set_xlabel('Revenue (€)')
# 4. Age distribution (center left)
ax4 = fig.add_subplot(gs[1, 0])
ax4.hist(customer_data['age'], bins=15, color=self.colors['secondary'], alpha=0.7)
ax4.set_title('Age Distribution')
ax4.set_xlabel('Age')
# 5. Regional distribution (center center)
ax5 = fig.add_subplot(gs[1, 1])
region_revenue = geo_data.groupby('region')['revenue'].sum()
ax5.bar(region_revenue.index, region_revenue.values,
color=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success'], self.colors['info']])
ax5.set_title('Revenue by Region')
ax5.set_ylabel('Revenue (€)')
# 6. Rating vs. Price (center right)
ax6 = fig.add_subplot(gs[1, 2])
ax6.scatter(product_data['price'], product_data['rating'],
alpha=0.6, color=self.colors['accent'])
ax6.set_title('Rating vs. Price')
ax6.set_xlabel('Price (€)')
ax6.set_ylabel('Rating')
# 7. Revenue distribution (bottom left-center)
ax7 = fig.add_subplot(gs[2, :2])
ax7.hist(revenue_data['revenue'], bins=30, color=self.colors['primary'],
alpha=0.7, edgecolor='black')
ax7.set_title('Daily Revenue Distribution')
ax7.set_xlabel('Revenue (€)')
ax7.set_ylabel('Frequency')
ax7.axvline(revenue_data['revenue'].mean(), color=self.colors['accent'],
linestyle='--', linewidth=2, label=f'Mean: €{revenue_data["revenue"].mean():.0f}')
ax7.legend()
# 8. Correlation heatmap (bottom right)
ax8 = fig.add_subplot(gs[2, 2])
# Create correlation matrix
corr_data = customer_data[['age', 'income', 'total_spent', 'orders']].corr()
im = ax8.imshow(corr_data, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
# Color legend
## Data Science Workflow
### Typical Data Science Process
```mermaid
graph TD
A[Problem Definition] --> B[Data Collection]
B --> C[Data Cleaning]
C --> D[Exploratory Analysis]
D --> E[Feature Engineering]
E --> F[Model Building]
F --> G[Model Evaluation]
G --> H[Deployment]
H --> I[Monitoring]
I --> A
CRISP-DM Methodology
- Business Understanding: Understand business objectives
- Data Understanding: Explore data
- Data Preparation: Clean and prepare data
- Modeling: Develop models
- Evaluation: Evaluate models
- Deployment: Deploy models
- Monitoring: Monitor performance
Big Data Technologies
Hadoop Ecosystem
| Component | Function | Alternatives |
|---|---|---|
| HDFS | Distributed Storage | Amazon S3, Google Cloud Storage |
| MapReduce | Batch Processing | Apache Spark, Apache Flink |
| Hive | SQL on Hadoop | Presto, Apache Impala |
| HBase | NoSQL Database | Cassandra, MongoDB |
| Zookeeper | Coordination | Consul, etcd |
Spark vs. MapReduce
| Feature | Spark | MapReduce |
|---|---|---|
| Speed | In-Memory | Disk-based |
| Ease of Use | High APIs | Low-level |
| Streaming | Native | Batch only |
| ML Support | MLlib | External |
| Resource Usage | Higher | Lower |
Machine Learning Pipeline Components
Data Pipeline Stages
# ML Pipeline example with Scikit-learn
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
# Preprocessing
numeric_features = ['age', 'income', 'orders']
categorical_features = ['segment', 'region']
numeric_transformer = Pipeline(steps=[
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# Complete Pipeline
ml_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('regressor', RandomForestRegressor(n_estimators=100))
])
MLOps Practices
- Version Control: Git for code, DVC for data
- Experiment Tracking: MLflow, Weights & Biases
- Model Registry: MLflow Model Registry
- CI/CD: GitHub Actions, Jenkins
- Monitoring: Prometheus, Grafana
- Drift Detection: Evidently AI, NannyML
Data Visualization Principles
Chart Selection Guide
| Data Type | Visualization | Purpose |
|---|---|---|
| Time Series | Line Chart | Trends over time |
| Categorical | Bar Chart | Comparisons |
| Distribution | Histogram | Frequencies |
| Relationship | Scatter Plot | Correlations |
| Composition | Pie Chart | Proportions |
| Geographic | Map | Locations |
Color Theory for Data Viz
- Primary Colors: Blue, Red, Yellow
- Secondary Colors: Green, Orange, Purple
- Color Psychology: Blue=Trust, Red=Danger, Green=Success
- Contrast: Light/Dark for readability
- Color Blindness: 8-10% of population
ETL Best Practices
Data Quality Checks
def validate_data_quality(df):
"""Validate data quality"""
quality_report = {
'total_rows': len(df),
'missing_values': df.isnull().sum().to_dict(),
'duplicate_rows': df.duplicated().sum(),
'data_types': df.dtypes.to_dict(),
'outliers': detect_outliers(df)
}
return quality_report
def detect_outliers(df, column):
"""Detect outliers using IQR method"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
return len(outliers)
Performance Optimization
- Parallel Processing: Multiprocessing, Dask
- Memory Management: Chunking, Generators
- Indexing: Database Indexes
- Caching: Redis, Memcached
- Batch Processing: Bulk Operations
Analytics Techniques
Statistical Methods
- Descriptive Statistics: Mean, Median, Mode, Std Dev
- Inferential Statistics: Hypothesis Testing, Confidence Intervals
- Correlation Analysis: Pearson, Spearman, Kendall
- Regression Analysis: Linear, Logistic, Polynomial
- Time Series: ARIMA, Prophet, LSTM
Advanced Analytics
- Clustering: K-Means, Hierarchical, DBSCAN
- Classification: Decision Trees, Random Forest, SVM
- Anomaly Detection: Isolation Forest, One-Class SVM
- Dimensionality Reduction: PCA, t-SNE, UMAP
- Association Rules: Apriori, FP-Growth
Advantages and Disadvantages
Advantages of Data Science
- Data-Driven Decisions: Better business decisions
- Pattern Recognition: Discover hidden patterns
- Predictive Analytics: Forecast future trends
- Process Optimization: Increase efficiency
- Competitive Advantage: Gain competitive edge
Disadvantages
- Data Quality: Dependent on data quality
- Complexity: Complex algorithms and tools
- Privacy Concerns: Data protection and ethics
- Resource Intensive: Computing power and storage
- Interpretability: Black-box models
Common Exam Questions
-
What is the difference between Big Data and traditional data? Big Data is characterized by the 3Vs (Volume, Velocity, Variety) and requires specialized technologies for processing.
-
Explain the ETL process! ETL stands for Extract (extract data), Transform (clean and transform data), and Load (load data into target system).
-
When do you use which chart type? Time series for trends, bar charts for comparisons, histograms for distributions, scatter plots for relationships.
-
What is the purpose of Machine Learning Pipelines? ML pipelines automate and standardize the entire workflow from data preparation to model deployment.
Key Sources
- https://spark.apache.org/
- https://airflow.apache.org/
- https://matplotlib.org/
- https://plotly.com/python/