Skip to content
IRC-Coding IRC-Coding
Data Science Fundamentals Big Data Analytics ML Pipelines ETL Processes Data Visualization

Data Science Fundamentals: Big Data, ML & ETL

Master data science basics: Big Data analytics, ML pipelines, ETL processes, and visualization with Python, Pandas, and Scikit-learn.

S

schutzgeist

2 min read
Data Science Fundamentals: Big Data, ML & ETL

Data Science Basics: Big Data Analytics, ML Pipelines, ETL & Visualization

This article is a comprehensive introduction to Data Science Basics – including Big Data Analytics, Machine Learning Pipelines, ETL processes and Data Visualization with practical examples.

In a Nutshell

Data Science combines statistics, programming and domain knowledge to gain insights from data. Big Data processes large volumes of data, ML pipelines automate models, ETL prepares data, visualization makes results visible.

Compact Technical Description

Data Science is an interdisciplinary field that uses scientific methods, processes, algorithms and systems to extract knowledge and insights from structured and unstructured data.

Core Areas:

Big Data Analytics

  • Concept: Processing extremely large volumes of data
  • 3V Model: Volume, Velocity, Variety
  • Technologies: Hadoop, Spark, NoSQL databases
  • Applications: Real-time analysis, Predictive Analytics

Machine Learning Pipelines

  • Concept: Automated ML workflow orchestration
  • Phases: Data Collection → Preprocessing → Training → Evaluation → Deployment
  • Tools: Scikit-learn, TensorFlow, MLflow, Airflow
  • MLOps: Versioning, Monitoring, Retraining

ETL Processes

  • Extract: Extract data from various sources
  • Transform: Clean and transform data
  • Load: Load data into target systems
  • Tools: Apache NiFi, Talend, AWS Glue

Data Visualization

  • Concept: Visual representation of data and insights
  • Chart Types: Bar, Line, Scatter, Heatmap, Treemap
  • Tools: Matplotlib, Seaborn, Plotly, Tableau
  • Principles: Clarity, Accuracy, Efficiency

Exam-Relevant Key Points

  • Data Science: Interdisciplinary field for data analysis
  • Big Data: Large, fast, diverse data volumes
  • Machine Learning: Automated pattern recognition in data
  • ETL: Extract-Transform-Load for data preparation
  • Visualization: Visual data representation
  • Pipelines: Automated data processing workflows
  • Analytics: Statistical and exploratory data analysis
  • IHK-relevant: Modern data analysis and Business Intelligence

Core Components

  1. Data Collection: Data sources and collection
  2. Data Storage: Storage and organization
  3. Data Processing: Cleaning and transformation
  4. Data Analysis: Statistical and exploratory analysis
  5. Machine Learning: Models and algorithms
  6. Data Visualization: Visual representation
  7. Model Deployment: Production deployment
  8. Monitoring: Performance and drift detection

Practical Examples

1. Big Data Processing with Apache Spark and Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

# Big Data Processing Demo
class BigDataAnalytics:
    
    def __init__(self, app_name="BigDataAnalytics"):
        """Initialize Spark Session"""
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
            .getOrCreate()
        
        print(f"Spark Session created: {self.spark.version}")
    
    def create_sample_data(self):
        """Create sample data for Big Data demo"""
        import random
        from datetime import datetime, timedelta
        
        # Simulated sales data
        data = []
        products = ["Laptop", "Smartphone", "Tablet", "Headphones", "Mouse", "Keyboard"]
        regions = ["North", "South", "East", "West", "Central"]
        
        start_date = datetime(2023, 1, 1)
        
        for i in range(1000000):  # 1M records
            date = start_date + timedelta(days=random.randint(0, 365))
            record = {
                "transaction_id": f"T{i:06d}",
                "date": date.strftime("%Y-%m-%d"),
                "product": random.choice(products),
                "region": random.choice(regions),
                "quantity": random.randint(1, 10),
                "unit_price": round(random.uniform(10, 1000), 2),
                "customer_id": f"C{random.randint(1, 50000):05d}",
                "sales_rep": f"SR{random.randint(1, 100):03d}"
            }
            record["total_amount"] = record["quantity"] * record["unit_price"]
            data.append(record)
        
        # Create Spark DataFrame
        schema = StructType([
            StructField("transaction_id", StringType(), True),
            StructField("date", StringType(), True),
            StructField("product", StringType(), True),
            StructField("region", StringType(), True),
            StructField("quantity", IntegerType(), True),
            StructField("unit_price", DoubleType(), True),
            StructField("customer_id", StringType(), True),
            StructField("sales_rep", StringType(), True),
            StructField("total_amount", DoubleType(), True)
        ])
        
        df = self.spark.createDataFrame(data, schema)
        print(f"Sample data created: {df.count()} rows")
        
        return df
    
    def basic_analytics(self, df):
        """Perform basic analytics"""
        print("=== Basic Analytics ===")
        
        # Display schema
        print("Schema:")
        df.printSchema()
        
        # Statistical summary
        print("\nStatistical Summary:")
        df.describe().show()
        
        # Top products by revenue
        print("\nTop Products by Revenue:")
        product_sales = df.groupBy("product") \
            .agg(sum("total_amount").alias("total_sales"),
                 count("transaction_id").alias("transaction_count")) \
            .orderBy(col("total_sales").desc())
        
        product_sales.show()
        
        # Regional analysis
        print("\nRegional Sales Analysis:")
        regional_sales = df.groupBy("region") \
            .agg(sum("total_amount").alias("total_sales"),
                 avg("total_amount").alias("avg_transaction"),
                 count("transaction_id").alias("transaction_count")) \
            .orderBy(col("total_sales").desc())
        
        regional_sales.show()
        
        # Time-based analysis
        print("\nMonthly Revenue Development:")
        monthly_sales = df.withColumn("month", substring(col("date"), 1, 7)) \
            .groupBy("month") \
            .agg(sum("total_amount").alias("monthly_sales"),
                 count("transaction_id").alias("transaction_count")) \
            .orderBy("month")
        
        monthly_sales.show()
        
        return product_sales, regional_sales, monthly_sales
    
    def advanced_analytics(self, df):
        """Advanced analytics with Window Functions"""
        print("\n=== Advanced Analytics ===")
        
        # Window Functions for running totals
        from pyspark.sql.window import Window
        
        # Monthly cumulative revenue
        window_spec = Window.partitionBy("month").orderBy("date") \
            .rowsBetween(Window.unboundedPreceding, Window.currentRow)
        
        monthly_cumulative = df.withColumn("month", substring(col("date"), 1, 7)) \
            .groupBy("month", "date") \
            .agg(sum("total_amount").alias("daily_sales")) \
            .withColumn("cumulative_sales", sum("daily_sales").over(window_spec)) \
            .orderBy("month", "date")
        
        print("Monthly cumulative revenue:")
        monthly_cumulative.show(20)
        
        # Best-Performing Sales Reps
        rep_performance = df.groupBy("sales_rep") \
            .agg(sum("total_amount").alias("total_sales"),
                 count("transaction_id").alias("transaction_count"),
                 avg("total_amount").alias("avg_transaction")) \
            .withColumn("performance_rank", percent_rank().over(Window.orderBy(col("total_sales").desc()))) \
            .filter(col("performance_rank") <= 0.1)  # Top 10%
        
        print("\nTop 10% Sales Reps:")
        rep_performance.show()
        
        # Product correlations
        product_correlation = df.groupBy("customer_id", "product") \
            .agg(sum("quantity").alias("product_quantity")) \
            .groupBy("customer_id") \
            .pivot("product") \
            .agg(sum("product_quantity")) \
            .fillna(0)
        
        print("\nProduct Correlation Matrix (Top 10 Customers):")
        product_correlation.limit(10).show()
        
        return monthly_cumulative, rep_performance, product_correlation
    
    def machine_learning_pipeline(self, df):
        """Machine Learning Pipeline for revenue forecasting"""
        print("\n=== Machine Learning Pipeline ===")
        
        # Prepare features for ML
        feature_data = df.groupBy("product", "region", "month") \
            .agg(sum("total_amount").alias("target_sales"),
                 count("transaction_id").alias("transaction_count"),
                 avg("unit_price").alias("avg_unit_price"),
                 sum("quantity").alias("total_quantity"))
        
        # Encode categorical features
        from pyspark.ml.feature import StringIndexer, OneHotEncoder
        
        # Product Indexer
        product_indexer = StringIndexer(inputCol="product", outputCol="product_index")
        product_indexed = product_indexer.fit(feature_data).transform(feature_data)
        
        # Region Indexer
        region_indexer = StringIndexer(inputCol="region", outputCol="region_index")
        region_indexed = region_indexer.fit(product_indexed).transform(product_indexed)
        
        # One-Hot Encoding
        encoder = OneHotEncoder(inputCols=["product_index", "region_index"],
                               outputCols=["product_encoded", "region_encoded"])
        encoded_data = encoder.fit(region_indexed).transform(region_indexed)
        
        # Create feature vector
        assembler = VectorAssembler(
            inputCols=["transaction_count", "avg_unit_price", "total_quantity",
                       "product_encoded", "region_encoded"],
            outputCol="features"
        )
        
        assembled_data = assembler.transform(encoded_data)
        
        # Scale features
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
        scaler_model = scaler.fit(assembled_data)
        scaled_data = scaler_model.transform(assembled_data)
        
        # Train-Test Split
        train_data, test_data = scaled_data.randomSplit([0.8, 0.2], seed=42)
        
        print(f"Training data: {train_data.count()} rows")
        print(f"Test data: {test_data.count()} rows")
        
        # Linear Regression
        lr = LinearRegression(featuresCol="scaled_features", 
                            labelCol="target_sales",
                            predictionCol="predicted_sales")
        
        lr_model = lr.fit(train_data)
        
        # Make predictions on test data
        predictions = lr_model.transform(test_data)
        
        # Evaluate model
        evaluator = RegressionEvaluator(labelCol="target_sales",
                                     predictionCol="predicted_sales",
                                     metricName="rmse")
        
        rmse = evaluator.evaluate(predictions)
        r2 = evaluator.setMetricName("r2").evaluate(predictions)
        
        print(f"Model Evaluation:")
        print(f"RMSE: {rmse:.2f}")
        print(f"R²: {r2:.4f}")
        
        # Display feature importance
        print(f"\nFeature Coefficients:")
        feature_names = ["transaction_count", "avg_unit_price", "total_quantity",
                       "product_encoded", "region_encoded"]
        
        for i, (name, coef) in enumerate(zip(feature_names, lr_model.coefficients)):
            print(f"{name}: {coef:.4f}")
        
        return lr_model, predictions, scaler_model
    
    def real_time_processing_simulation(self, df):
        """Simulate real-time processing"""
        print("\n=== Real-Time Processing Simulation ===")
        
        # Simulate streaming DataFrame
        # In practice, this would come from Kafka, Kinesis etc.
        
        # Aggregations for real-time monitoring
        real_time_metrics = df.groupBy("product", "region") \
            .agg(count("transaction_id").alias("transaction_count"),
                 sum("total_amount").alias("total_sales"),
                 avg("total_amount").alias("avg_transaction")) \
            .orderBy(col("total_sales").desc())
        
        print("Real-Time Metrics:")
        real_time_metrics.show()
        
        # Anomaly detection (simplified)
        from pyspark.sql.functions import stddev
        
        # Statistical bounds for anomaly detection
        stats = df.groupBy("product") \
            .agg(avg("total_amount").alias("avg_amount"),
                 stddev("total_amount").alias("std_amount")) \
            .withColumn("upper_bound", col("avg_amount") + 2 * col("std_amount")) \
            .withColumn("lower_bound", col("avg_amount") - 2 * col("std_amount"))
        
        print("Anomaly bounds per product:")
        stats.show()
        
        return real_time_metrics, stats
    
    def data_export(self, df, predictions):
        """Export data for further processing"""
        print("\n=== Data Export ===")
        
        # Export results in various formats
        output_path = "/tmp/big_data_results"
        
        # Analytics results
        product_sales = df.groupBy("product") \
            .agg(sum("total_amount").alias("total_sales")) \
            .orderBy(col("total_sales").desc())
        
        product_sales.write.mode("overwrite").parquet(f"{output_path}/product_sales")
        print(f"Product sales exported to: {output_path}/product_sales")
        
        # ML predictions
        predictions.select("product", "region", "target_sales", "predicted_sales") \
            .write.mode("overwrite").parquet(f"{output_path}/predictions")
        
        print(f"ML predictions exported to: {output_path}/predictions")
        
        # Summary statistics
        summary_stats = df.agg(
            count("transaction_id").alias("total_transactions"),
            sum("total_amount").alias("total_revenue"),
            avg("total_amount").alias("avg_transaction_value"),
            countDistinct("customer_id").alias("unique_customers"),
            countDistinct("product").alias("unique_products")
        )
        
        summary_stats.write.mode("overwrite").json(f"{output_path}/summary_stats")
        print(f"Summary statistics exported to: {output_path}/summary_stats")
    
    def cleanup(self):
        """Free resources"""
        self.spark.stop()
        print("Spark Session terminated")

# Run Big Data demo
def big_data_demo():
    analytics = BigDataAnalytics()
    
    try:
        # Create data
        df = analytics.create_sample_data()
        
        # Basic analytics
        product_sales, regional_sales, monthly_sales = analytics.basic_analytics(df)
        
        # Advanced analytics
        monthly_cumulative, rep_performance, product_correlation = analytics.advanced_analytics(df)
        
        # Machine Learning Pipeline
        lr_model, predictions, scaler_model = analytics.machine_learning_pipeline(df)
        
        # Real-time processing
        real_time_metrics, stats = analytics.real_time_processing_simulation(df)
        
        # Export data
        analytics.data_export(df, predictions)
        
        print("\nBig Data Analytics demo completed successfully!")
        
    except Exception as e:
        print(f"Error in Big Data demo: {e}")
    finally:
        analytics.cleanup()

if __name__ == "__main__":
    big_data_demo()

2. ETL Pipeline with Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3UploadOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.models import Variable
from airflow.utils.dates import days_ago
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging

# ETL Pipeline Configuration
default_args = {
    'owner': 'data-science-team',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG Definition
dag = DAG(
    'data_science_etl_pipeline',
    default_args=default_args,
    description='ETL Pipeline for Data Science Analytics',
    schedule_interval='@daily',
    catchup=False,
    tags=['data-science', 'etl', 'analytics'],
)

# Configure logging
logger = logging.getLogger(__name__)

def extract_customer_data(**context):
    """Extract customer data from various sources"""
    logger.info("Extracting customer data...")
    
    # PostgreSQL Hook
    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Extract customer data from PostgreSQL
    customer_query = """
        SELECT 
            customer_id,
            first_name,
            last_name,
            email,
            phone,
            registration_date,
            last_login_date,
            total_orders,
            total_spent,
            customer_segment
        FROM customers 
        WHERE last_login_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
    """
    
    customers_df = postgres_hook.get_pandas_df(customer_query)
    logger.info(f"{len(customers_df)} customer records extracted")
    
    # S3 Hook for additional data
    s3_hook = S3Hook(aws_conn_id='aws_default')
    
    # Product interaction data from S3
    bucket_name = Variable.get('data_bucket_name', default_var='analytics-data')
    
    try:
        # Load CSV file from S3
        file_content = s3_hook.read_key(key='customer_interactions.csv', bucket_name=bucket_name)
        interactions_df = pd.read_csv(pd.StringIO(file_content))
        logger.info(f"{len(interactions_df)} interaction records loaded from S3")
    except Exception as e:
        logger.warning(f"No interaction data found in S3: {e}")
        interactions_df = pd.DataFrame()
    
    # Combine data
    if not interactions_df.empty:
        combined_df = customers_df.merge(
            interactions_df, 
            on='customer_id', 
            how='left'
        )
    else:
        combined_df = customers_df
    
    # Save data as JSON for next task
    combined_json = combined_df.to_json(date_format='iso')
    
    # Store in XCom
    task_instance = context['task_instance']
    task_instance.xcom_push(key='customer_data', value=combined_json)
    
    logger.info("Customer data extraction completed")
    return combined_json

def transform_customer_data(**context):
    """Transform and clean customer data"""
    logger.info("Transforming customer data...")
    
    # Get data from previous task
    task_instance = context['task_instance']
    customer_json = task_instance.xcom_pull(task_ids='extract_customer_data', key='customer_data')
    
    # JSON to DataFrame
    customers_df = pd.read_json(customer_json)
    
    # Data Cleaning
    logger.info("Performing data cleaning...")
    
    # 1. Remove duplicates
    original_count = len(customers_df)
    customers_df = customers_df.drop_duplicates(subset=['customer_id'])
    duplicates_removed = original_count - len(customers_df)
    logger.info(f"{duplicates_removed} duplicates removed")
    
    # 2. Handle missing values
    # Numeric columns: impute with median
    numeric_columns = ['total_orders', 'total_spent']
    for col in numeric_columns:
        if col in customers_df.columns:
            median_value = customers_df[col].median()
            customers_df[col].fillna(median_value, inplace=True)
    
    # Categorical columns: impute with mode
    categorical_columns = ['customer_segment']
    for col in categorical_columns:
        if col in customers_df.columns:
            mode_value = customers_df[col].mode()[0] if not customers_df[col].mode().empty else 'Unknown'
            customers_df[col].fillna(mode_value, inplace=True)
    
    # 3. Normalize date columns
    date_columns = ['registration_date', 'last_login_date']
    for col in date_columns:
        if col in customers_df.columns:
            customers_df[col] = pd.to_datetime(customers_df[col], errors='coerce')
    
    # 4. Feature Engineering
    logger.info("Creating new features...")
    
    # Calculate Customer Lifetime Value (CLV)
    if 'total_spent' in customers_df.columns and 'total_orders' in customers_df.columns:
        customers_df['avg_order_value'] = customers_df['total_spent'] / customers_df['total_orders']
        customers_df['clv_score'] = customers_df['total_spent'] * np.log1p(customers_df['total_orders'])
    
    # Recency Score (days since last login)
    if 'last_login_date' in customers_df.columns:
        current_date = datetime.now()
        customers_df['days_since_last_login'] = (current_date - customers_df['last_login_date']).dt.days
        customers_df['recency_score'] = pd.cut(customers_df['days_since_last_login'], 
                                              bins=[0, 7, 30, 90, float('inf')], 
                                              labels=[4, 3, 2, 1])
    
    # Engagement Score based on various factors
    engagement_features = []
    if 'total_orders' in customers_df.columns:
        engagement_features.append('total_orders')
    if 'days_since_last_login' in customers_df.columns:
        engagement_features.append('days_since_last_login')
    
    if engagement_features:
        # Normalized features for engagement score
        for feature in engagement_features:
            if feature in customers_df.columns:
                min_val = customers_df[feature].min()
                max_val = customers_df[feature].max()
                if max_val != min_val:
                    customers_df[f'{feature}_normalized'] = (customers_df[feature] - min_val) / (max_val - min_val)
                else:
                    customers_df[f'{feature}_normalized'] = 0
        
        # Calculate engagement score
        normalized_features = [f'{f}_normalized' for f in engagement_features]
        customers_df['engagement_score'] = customers_df[normalized_features].mean(axis=1)
    
    # 5. Data Validation
    logger.info("Performing data validation...")
    
    validation_errors = []
    
    # Check business rules
    if 'total_orders' in customers_df.columns:
        if (customers_df['total_orders'] < 0).any():
            validation_errors.append("Negative order quantities found")
    
    if 'total_spent' in customers_df.columns:
        if (customers_df['total_spent'] < 0).any():
            validation_errors.append("Negative revenue found")
    
    if 'email' in customers_df.columns:
        invalid_emails = customers_df[~customers_df['email'].str.contains('@', na=False)]
        if not invalid_emails.empty:
            validation_errors.append(f"{len(invalid_emails)} invalid email addresses found")
    
    if validation_errors:
        logger.warning(f"Validation errors: {validation_errors}")
    
    # 6. Aggregate data for analytics
    logger.info("Aggregating data for analytics...")
    
    # Customer segment statistics
    if 'customer_segment' in customers_df.columns:
        segment_stats = customers_df.groupby('customer_segment').agg({
            'customer_id': 'count',
            'total_spent': ['sum', 'mean'],
            'total_orders': ['sum', 'mean'],
            'engagement_score': 'mean'
        }).round(2)
        
        logger.info("Customer segment statistics created")
    
    # Save transformed data
    transformed_json = customers_df.to_json(date_format='iso')
    task_instance.xcom_push(key='transformed_data', value=transformed_json)
    
    logger.info(f"Data transformation completed: {len(customers_df)} records")
    return transformed_json

def load_data_to_warehouse(**context):
    """Load transformed data to Data Warehouse"""
    logger.info("Loading data to Data Warehouse...")
    
    # Get transformed data
    task_instance = context['task_instance']
    transformed_json = task_instance.xcom_pull(task_ids='transform_customer_data', key='transformed_data')
    
    # JSON to DataFrame
    customers_df = pd.read_json(transformed_json)
    
    # PostgreSQL Hook
    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Create table if not exists
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS analytics_customer_metrics (
        customer_id VARCHAR(50) PRIMARY KEY,
        first_name VARCHAR(100),
        last_name VARCHAR(100),
        email VARCHAR(255),
        phone VARCHAR(50),
        registration_date TIMESTAMP,
        last_login_date TIMESTAMP,
        total_orders INTEGER,
        total_spent DECIMAL(10,2),
        customer_segment VARCHAR(50),
        avg_order_value DECIMAL(10,2),
        clv_score DECIMAL(10,2),
        days_since_last_login INTEGER,
        recency_score INTEGER,
        engagement_score DECIMAL(5,2),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """
    
    postgres_hook.run(create_table_sql)
    
    # Upsert Operation (Insert or Update)
    upsert_sql = """
    INSERT INTO analytics_customer_metrics (
        customer_id, first_name, last_name, email, phone, 
        registration_date, last_login_date, total_orders, total_spent,
        customer_segment, avg_order_value, clv_score, days_since_last_login,
        recency_score, engagement_score
    ) VALUES (
        %(customer_id)s, %(first_name)s, %(last_name)s, %(email)s, %(phone)s,
        %(registration_date)s, %(last_login_date)s, %(total_orders)s, %(total_spent)s,
        %(customer_segment)s, %(avg_order_value)s, %(clv_score)s, %(days_since_last_login)s,
        %(recency_score)s, %(engagement_score)s
    )
    ON CONFLICT (customer_id) 
    DO UPDATE SET
        first_name = EXCLUDED.first_name,
        last_name = EXCLUDED.last_name,
        email = EXCLUDED.email,
        phone = EXCLUDED.phone,
        registration_date = EXCLUDED.registration_date,
        last_login_date = EXCLUDED.last_login_date,
        total_orders = EXCLUDED.total_orders,
        total_spent = EXCLUDED.total_spent,
        customer_segment = EXCLUDED.customer_segment,
        avg_order_value = EXCLUDED.avg_order_value,
        clv_score = EXCLUDED.clv_score,
        days_since_last_login = EXCLUDED.days_since_last_login,
        recency_score = EXCLUDED.recency_score,
        engagement_score = EXCLUDED.engagement_score,
        updated_at = CURRENT_TIMESTAMP;
    """
    
    # Load data in batches (performance optimization)
    batch_size = 1000
    total_rows = len(customers_df)
    
    for i in range(0, total_rows, batch_size):
        batch_df = customers_df.iloc[i:i+batch_size]
        
        # Prepare data for PostgreSQL
        batch_records = batch_df.to_dict('records')
        
        # Handle NaN values
        for record in batch_records:
            for key, value in record.items():
                if pd.isna(value):
                    record[key] = None
                elif isinstance(value, pd.Timestamp):
                    record[key] = value.isoformat()
        
        # Execute batch
        postgres_hook.run(upsert_sql, parameters=batch_records)
        
        logger.info(f"Batch {i//batch_size + 1}/{(total_rows-1)//batch_size + 1} loaded: {len(batch_df)} rows")
    
    # Also load to BigQuery for analytics
    try:
        bigquery_hook = BigQueryHook(gcp_conn_id='google_cloud_default', use_legacy_sql=False)
        
        # Configure dataset and table
        project_id = Variable.get('gcp_project_id', default_var='my-project')
        dataset_id = 'analytics'
        table_id = 'customer_metrics'
        
        # Load DataFrame to BigQuery
        customers_df.to_gbq(
            destination_table=f'{dataset_id}.{table_id}',
            project_id=project_id,
            if_exists='replace',
            progress_bar=False
        )
        
        logger.info("Data successfully loaded to BigQuery")
        
    except Exception as e:
        logger.warning(f"BigQuery load failed: {e}")
    
    logger.info("Data load operation completed")

def generate_analytics_report(**context):
    """Generate analytics report"""
    logger.info("Generating analytics report...")
    
    # PostgreSQL Hook for final data
    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Execute analytics queries
    analytics_queries = {
        'customer_segments': """
            SELECT 
                customer_segment,
                COUNT(*) as customer_count,
                AVG(total_spent) as avg_total_spent,
                AVG(total_orders) as avg_total_orders,
                AVG(clv_score) as avg_clv_score,
                AVG(engagement_score) as avg_engagement_score
            FROM analytics_customer_metrics
            GROUP BY customer_segment
            ORDER BY avg_total_spent DESC
        """,
        
        'top_customers': """
            SELECT 
                customer_id,
                first_name,
                last_name,
                total_spent,
                total_orders,
                clv_score,
                engagement_score
            FROM analytics_customer_metrics
            ORDER BY clv_score DESC
            LIMIT 10
        """,
        
        'engagement_distribution': """
            SELECT 
                CASE 
                    WHEN engagement_score >= 0.8 THEN 'High'
                    WHEN engagement_score >= 0.6 THEN 'Medium'
                    WHEN engagement_score >= 0.4 THEN 'Low'
                    ELSE 'Very Low'
                END as engagement_level,
                COUNT(*) as customer_count,
                AVG(total_spent) as avg_total_spent
            FROM analytics_customer_metrics
            GROUP BY engagement_level
            ORDER BY avg_total_spent DESC
        """,
        
        'daily_trends': """
            SELECT 
                DATE(last_login_date) as login_date,
                COUNT(*) as active_customers,
                AVG(total_spent) as avg_spend_per_customer
            FROM analytics_customer_metrics
            WHERE last_login_date >= CURRENT_DATE - INTERVAL 30 DAY
            GROUP BY DATE(last_login_date)
            ORDER BY login_date DESC
        """
    }
    
    # Collect results
    analytics_results = {}
    
    for query_name, query in analytics_queries.items():
        try:
            result_df = postgres_hook.get_pandas_df(query)
            analytics_results[query_name] = result_df
            logger.info(f"Analytics query '{query_name}' executed: {len(result_df)} rows")
        except Exception as e:
            logger.error(f"Error in query '{query_name}': {e}")
            analytics_results[query_name] = pd.DataFrame()
    
    # Create report
    report_data = {
        'generated_at': datetime.now().isoformat(),
        'total_customers': len(analytics_results.get('customer_segments', pd.DataFrame())),
        'analytics': {}
    }
    
    for query_name, result_df in analytics_results.items():
        if not result_df.empty:
            report_data['analytics'][query_name] = result_df.to_dict('records')
    
    # Save report as JSON
    report_json = json.dumps(report_data, indent=2, default=str)
    
    # Save report to S3
    try:
        s3_hook = S3Hook(aws_conn_id='aws_default')
        bucket_name = Variable.get('reports_bucket_name', default_var='analytics-reports')
        
        report_filename = f"customer_analytics_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        s3_hook.load_string(
            report_json,
            key=report_filename,
            bucket_name=bucket_name,
            replace=True
        )
        
        logger.info(f"Analytics report saved: s3://{bucket_name}/{report_filename}")
        
    except Exception as e:
        logger.error(f"Error saving report: {e}")
    
    # Log report metrics
    if 'customer_segments' in analytics_results:
        segments_df = analytics_results['customer_segments']
        if not segments_df.empty:
            logger.info("Customer segment distribution:")
            for _, row in segments_df.iterrows():
                logger.info(f"  {row['customer_segment']}: {row['customer_count']} customers, "
                           f"Ø revenue: €{row['avg_total_spent']:.2f}")
    
    logger.info("Analytics report generation completed")

# Define tasks
extract_task = PythonOperator(
    task_id='extract_customer_data',
    python_callable=extract_customer_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_customer_data',
    python_callable=transform_customer_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_data_to_warehouse',
    python_callable=load_data_to_warehouse,
    dag=dag,
)

report_task = PythonOperator(
    task_id='generate_analytics_report',
    python_callable=generate_analytics_report,
    dag=dag,
)

# Task Dependencies
extract_task >> transform_task >> load_task >> report_task

3. Data Visualization with Matplotlib, Seaborn and Plotly

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Data Visualization Demo
class DataVisualizationDemo:
    
    def __init__(self):
        """Set style and configuration"""
        # Matplotlib configuration
        plt.style.use('seaborn-v0_8')
        plt.rcParams['figure.figsize'] = (12, 8)
        plt.rcParams['font.size'] = 10
        plt.rcParams['axes.titlesize'] = 14
        plt.rcParams['axes.labelsize'] = 12
        plt.rcParams['xtick.labelsize'] = 10
        plt.rcParams['ytick.labelsize'] = 10
        plt.rcParams['legend.fontsize'] = 10
        plt.rcParams['figure.titlesize'] = 16
        
        # Seaborn configuration
        sns.set_palette("husl")
        
        # Define colors
        self.colors = {
            'primary': '#2E86AB',
            'secondary': '#A23B72',
            'accent': '#F18F01',
            'success': '#C73E1D',
            'warning': '#F4A261',
            'info': '#264653',
            'light': '#E9C46A',
            'dark': '#2A9D8F'
        }
        
        print("Data Visualization Demo initialized")
    
    def create_sample_data(self):
        """Create sample data for visualization"""
        np.random.seed(42)
        
        # Time series data (revenue development)
        dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
        base_revenue = 10000
        trend = np.linspace(0, 5000, len(dates))
        seasonal = 2000 * np.sin(2 * np.pi * np.arange(len(dates)) / 365.25)
        noise = np.random.normal(0, 500, len(dates))
        
        revenue_data = pd.DataFrame({
            'date': dates,
            'revenue': base_revenue + trend + seasonal + noise,
            'orders': np.random.poisson(100, len(dates)) + 50,
            'customers': np.random.poisson(80, len(dates)) + 30
        })
        
        # Customer data
        customer_data = pd.DataFrame({
            'customer_id': range(1000),
            'age': np.random.normal(35, 10, 1000),
            'income': np.random.lognormal(10.5, 0.5, 1000),
            'total_spent': np.random.gamma(2, 100, 1000),
            'orders': np.random.poisson(5, 1000) + 1,
            'segment': np.random.choice(['Bronze', 'Silver', 'Gold', 'Platinum'], 1000, 
                                      p=[0.4, 0.3, 0.2, 0.1]),
            'registration_date': pd.date_range('2020-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)],
            'last_purchase': pd.date_range('2023-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)]
        })
        
        # Product data
        products = ['Laptop', 'Smartphone', 'Tablet', 'Headphones', 'Mouse', 'Keyboard', 'Monitor', 'Webcam']
        product_data = pd.DataFrame({
            'product': np.random.choice(products, 5000),
            'category': np.random.choice(['Electronics', 'Accessories'], 5000),
            'price': np.random.uniform(10, 1000, 5000),
            'quantity_sold': np.random.poisson(10, 5000) + 1,
            'rating': np.random.uniform(3, 5, 5000),
            'month': np.random.choice(range(1, 13), 5000)
        })
        
        # Geographic data
        regions = ['North', 'South', 'East', 'West', 'Central']
        geo_data = pd.DataFrame({
            'region': np.random.choice(regions, 1000),
            'city': [f"{region} City {i}" for region, i in zip(
                np.random.choice(regions, 1000), 
                np.random.randint(1, 10, 1000)
            )],
            'population': np.random.lognormal(10, 1, 1000),
            'revenue': np.random.gamma(2, 50000, 1000),
            'stores': np.random.poisson(5, 1000) + 1
        })
        
        return revenue_data, customer_data, product_data, geo_data
    
    def create_time_series_visualizations(self, revenue_data):
        """Create time series visualizations"""
        print("Creating time series visualizations...")
        
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Revenue Analysis Time Series', fontsize=16, fontweight='bold')
        
        # 1. Revenue development
        axes[0, 0].plot(revenue_data['date'], revenue_data['revenue'], 
                       color=self.colors['primary'], linewidth=2)
        axes[0, 0].set_title('Daily Revenue Development')
        axes[0, 0].set_xlabel('Date')
        axes[0, 0].set_ylabel('Revenue (€)')
        axes[0, 0].grid(True, alpha=0.3)
        
        # Add moving average
        revenue_data['ma_7'] = revenue_data['revenue'].rolling(window=7).mean()
        revenue_data['ma_30'] = revenue_data['revenue'].rolling(window=30).mean()
        
        axes[0, 0].plot(revenue_data['date'], revenue_data['ma_7'], 
                       color=self.colors['accent'], linewidth=1, alpha=0.7, label='7-Day MA')
        axes[0, 0].plot(revenue_data['date'], revenue_data['ma_30'], 
                       color=self.colors['secondary'], linewidth=1, alpha=0.7, label='30-Day MA')
        axes[0, 0].legend()
        
        # 2. Monthly revenue
        monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
        axes[0, 1].bar(monthly_revenue.index, monthly_revenue.values, 
                      color=self.colors['info'], alpha=0.7)
        axes[0, 1].set_title('Monthly Total Revenue')
        axes[0, 1].set_xlabel('Month')
        axes[0, 1].set_ylabel('Revenue (€)')
        axes[0, 1].tick_params(axis='x', rotation=45)
        
        # 3. Revenue vs Orders Scatter Plot
        axes[1, 0].scatter(revenue_data['orders'], revenue_data['revenue'], 
                          alpha=0.6, color=self.colors['primary'])
        axes[1, 0].set_title('Revenue vs. Orders')
        axes[1, 0].set_xlabel('Number of Orders')
        axes[1, 0].set_ylabel('Revenue (€)')
        
        # Add trend line
        z = np.polyfit(revenue_data['orders'], revenue_data['revenue'], 1)
        p = np.poly1d(z)
        axes[1, 0].plot(revenue_data['orders'], p(revenue_data['orders']), 
                       color=self.colors['accent'], linewidth=2)
        
        # 4. Distribution of daily revenue
        axes[1, 1].hist(revenue_data['revenue'], bins=30, color=self.colors['secondary'], 
                       alpha=0.7, edgecolor='black')
        axes[1, 1].set_title('Distribution of Daily Revenue')
        axes[1, 1].set_xlabel('Revenue (€)')
        axes[1, 1].set_ylabel('Frequency')
        axes[1, 1].axvline(revenue_data['revenue'].mean(), color=self.colors['accent'], 
                           linestyle='--', linewidth=2, label=f'Mean: €{revenue_data["revenue"].mean():.0f}')
        axes[1, 1].legend()
        
        plt.tight_layout()
        plt.savefig('time_series_analysis.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_customer_analytics_visualizations(self, customer_data):
        """Create customer analytics visualizations"""
        print("Creating customer analytics visualizations...")
        
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        fig.suptitle('Customer Analytics Dashboard', fontsize=16, fontweight='bold')
        
        # 1. Age distribution
        axes[0, 0].hist(customer_data['age'], bins=20, color=self.colors['primary'], 
                       alpha=0.7, edgecolor='black')
        axes[0, 0].set_title('Age Distribution')
        axes[0, 0].set_xlabel('Age')
        axes[0, 0].set_ylabel('Count')
        
        # 2. Income distribution
        axes[0, 1].hist(customer_data['income'], bins=30, color=self.colors['secondary'], 
                       alpha=0.7, edgecolor='black')
        axes[0, 1].set_title('Income Distribution')
        axes[0, 1].set_xlabel('Income (€)')
        axes[0, 1].set_ylabel('Count')
        
        # 3. Customer segment distribution
        segment_counts = customer_data['segment'].value_counts()
        axes[0, 2].pie(segment_counts.values, labels=segment_counts.index, 
                      autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'], 
                                                self.colors['accent'], self.colors['success']])
        axes[0, 2].set_title('Customer Segment Distribution')
        
        # 4. Revenue by segment
        segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
        bars = axes[1, 0].bar(segment_revenue.index, segment_revenue.values, 
                              color=[self.colors['primary'], self.colors['secondary'], 
                                     self.colors['accent'], self.colors['success']])
        axes[1, 0].set_title('Average Revenue by Segment')
        axes[1, 0].set_xlabel('Segment')
        axes[1, 0].set_ylabel('Ø Revenue (€)')
        
        # Display values on bars
        for bar, value in zip(bars, segment_revenue.values):
            axes[1, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 100,
                           f'€{value:.0f}', ha='center', va='bottom')
        
        # 5. Age vs. Total spending
        scatter = axes[1, 1].scatter(customer_data['age'], customer_data['total_spent'], 
                                   alpha=0.6, c=customer_data['orders'], cmap='viridis')
        axes[1, 1].set_title('Age vs. Total Spending')
        axes[1, 1].set_xlabel('Age')
        axes[1, 1].set_ylabel('Total Spending (€)')
        plt.colorbar(scatter, ax=axes[1, 1], label='Number of Orders')
        
        # 6. Boxplot for revenue by segment
        segments = customer_data['segment'].unique()
        data_for_boxplot = [customer_data[customer_data['segment'] == seg]['total_spent'] 
                           for seg in segments]
        
        box_plot = axes[1, 2].boxplot(data_for_boxplot, labels=segments, patch_artist=True)
        colors = [self.colors['primary'], self.colors['secondary'], 
                 self.colors['accent'], self.colors['success']]
        
        for patch, color in zip(box_plot['boxes'], colors):
            patch.set_facecolor(color)
            patch.set_alpha(0.7)
        
        axes[1, 2].set_title('Revenue Distribution by Segment')
        axes[1, 2].set_xlabel('Segment')
        axes[1, 2].set_ylabel('Total Spending (€)')
        
        plt.tight_layout()
        plt.savefig('customer_analytics.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_product_analytics_visualizations(self, product_data):
        """Create product analytics visualizations"""
        print("Creating product analytics visualizations...")
        
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Product Analytics Dashboard', fontsize=16, fontweight='bold')
        
        # 1. Top products by revenue
        product_revenue = product_data.groupby('product').apply(
            lambda x: (x['price'] * x['quantity_sold']).sum()
        ).sort_values(ascending=False)
        
        bars = axes[0, 0].barh(product_revenue.index, product_revenue.values, 
                              color=self.colors['primary'])
        axes[0, 0].set_title('Top Products by Total Revenue')
        axes[0, 0].set_xlabel('Total Revenue (€)')
        
        # Display values on bars
        for i, (bar, value) in enumerate(zip(bars, product_revenue.values)):
            axes[0, 0].text(value + max(product_revenue.values) * 0.01, i,
                           f'€{value:,.0f}', va='center')
        
        # 2. Price distribution by category
        categories = product_data['category'].unique()
        for i, category in enumerate(categories):
            category_data = product_data[product_data['category'] == category]['price']
            axes[0, 1].hist(category_data, bins=20, alpha=0.7, label=category,
                           color=self.colors['primary'] if i == 0 else self.colors['secondary'])
        
        axes[0, 1].set_title('Price Distribution by Category')
        axes[0, 1].set_xlabel('Price (€)')
        axes[0, 1].set_ylabel('Frequency')
        axes[0, 1].legend()
        
        # 3. Rating vs. Price
        axes[1, 0].scatter(product_data['price'], product_data['rating'], 
                          alpha=0.6, color=self.colors['accent'])
        axes[1, 0].set_title('Rating vs. Price')
        axes[1, 0].set_xlabel('Price (€)')
        axes[1, 0].set_ylabel('Rating')
        
        # Correlation coefficient
        correlation = np.corrcoef(product_data['price'], product_data['rating'])[0, 1]
        axes[1, 0].text(0.05, 0.95, f'Correlation: {correlation:.3f}', 
                       transform=axes[1, 0].transAxes, va='top',
                       bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
        
        # 4. Monthly sales
        monthly_sales = product_data.groupby('month')['quantity_sold'].sum()
        month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
                      'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        
        axes[1, 1].bar(monthly_sales.index, monthly_sales.values, 
                      color=self.colors['info'], alpha=0.7)
        axes[1, 1].set_title('Monthly Sales Quantities')
        axes[1, 1].set_xlabel('Month')
        axes[1, 1].set_ylabel('Sales Quantity')
        axes[1, 1].set_xticks(range(1, 13))
        axes[1, 1].set_xticklabels(month_names, rotation=45)
        
        plt.tight_layout()
        plt.savefig('product_analytics.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_geographic_visualizations(self, geo_data):
        """Create geographic visualizations"""
        print("Creating geographic visualizations...")
        
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Geographic Analysis Dashboard', fontsize=16, fontweight='bold')
        
        # 1. Revenue by region
        region_revenue = geo_data.groupby('region')['revenue'].sum().sort_values(ascending=False)
        bars = axes[0, 0].bar(region_revenue.index, region_revenue.values, 
                              color=[self.colors['primary'], self.colors['secondary'], 
                                     self.colors['accent'], self.colors['success'], self.colors['info'])
        axes[0, 0].set_title('Total Revenue by Region')
        axes[0, 0].set_ylabel('Revenue (€)')
        
        # Display values on bars
        for bar, value in zip(bars, region_revenue.values):
            axes[0, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_revenue.values) * 0.01,
                           f'€{value:,.0f}', ha='center', va='bottom')
        
        # 2. Population vs. Revenue
        axes[0, 1].scatter(geo_data['population'], geo_data['revenue'], 
                          alpha=0.6, s=geo_data['stores']*10, 
                          c=self.colors['primary'])
        axes[0, 1].set_title('Population vs. Revenue')
        axes[0, 1].set_xlabel('Population')
        axes[0, 1].set_ylabel('Revenue (€)')
        
        # 3. Store density by region
        region_stores = geo_data.groupby('region')['stores'].sum()
        axes[1, 0].pie(region_stores.values, labels=region_stores.index, 
                      autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'], 
                                                self.colors['accent'], self.colors['success'], self.colors['info']])
        axes[1, 0].set_title('Store Distribution by Region')
        
        # 4. Revenue per store
        region_avg_revenue = geo_data.groupby('region').apply(
            lambda x: x['revenue'].sum() / x['stores'].sum()
        ).sort_values(ascending=False)
        
        bars = axes[1, 1].bar(region_avg_revenue.index, region_avg_revenue.values, 
                              color=self.colors['secondary'])
        axes[1, 1].set_title('Average Revenue per Store')
        axes[1, 1].set_ylabel('Ø Revenue per Store (€)')
        
        # Display values on bars
        for bar, value in zip(bars, region_avg_revenue.values):
            axes[1, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_avg_revenue.values) * 0.01,
                           f'€{value:,.0f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.savefig('geographic_analytics.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_interactive_visualizations(self, revenue_data, customer_data, product_data):
        """Create interactive visualizations with Plotly"""
        print("Creating interactive visualizations...")
        
        # 1. Interactive time series
        fig1 = make_subplots(
            rows=2, cols=1,
            subplot_titles=('Daily Revenue', 'Monthly Revenue'),
            vertical_spacing=0.1
        )
        
        # Daily revenue with moving averages
        fig1.add_trace(
            go.Scatter(
                x=revenue_data['date'],
                y=revenue_data['revenue'],
                mode='lines',
                name='Daily Revenue',
                line=dict(color='blue', width=1)
            ),
            row=1, col=1
        )
        
        fig1.add_trace(
            go.Scatter(
                x=revenue_data['date'],
                y=revenue_data['ma_7'],
                mode='lines',
                name='7-Day MA',
                line=dict(color='orange', width=2)
            ),
            row=1, col=1
        )
        
        # Monthly revenue
        monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
        fig1.add_trace(
            go.Bar(
                x=monthly_revenue.index,
                y=monthly_revenue.values,
                name='Monthly Revenue',
                marker_color='lightblue'
            ),
            row=2, col=1
        )
        
        fig1.update_layout(
            title='Interactive Revenue Analysis',
            height=600,
            showlegend=True
        )
        
        fig1.show()
        
        # 2. Interactive customer analysis
        fig2 = make_subplots(
            rows=2, cols=2,
            subplot_titles=('Age Distribution', 'Income Distribution', 
                          'Revenue by Segment', 'Age vs. Revenue'),
            specs=[[{"type": "histogram"}, {"type": "histogram"}],
                   [{"type": "bar"}, {"type": "scatter"}]]
        )
        
        # Age distribution
        fig2.add_trace(
            go.Histogram(
                x=customer_data['age'],
                nbinsx=20,
                name='Age',
                marker_color='lightgreen'
            ),
            row=1, col=1
        )
        
        # Income distribution
        fig2.add_trace(
            go.Histogram(
                x=customer_data['income'],
                nbinsx=30,
                name='Income',
                marker_color='lightcoral'
            ),
            row=1, col=2
        )
        
        # Revenue by segment
        segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
        fig2.add_trace(
            go.Bar(
                x=segment_revenue.index,
                y=segment_revenue.values,
                name='Ø Revenue',
                marker_color='lightblue'
            ),
            row=2, col=1
        )
        
        # Age vs. Revenue
        fig2.add_trace(
            go.Scatter(
                x=customer_data['age'],
                y=customer_data['total_spent'],
                mode='markers',
                name='Customers',
                marker=dict(
                    size=8,
                    color=customer_data['orders'],
                    colorscale='Viridis',
                    showscale=True,
                    colorbar=dict(title="Orders")
                )
            ),
            row=2, col=2
        )
        
        fig2.update_layout(
            title='Interactive Customer Analysis',
            height=600,
            showlegend=False
        )
        
        fig2.show()
        
        # 3. Interactive 3D visualization
        fig3 = go.Figure(data=[go.Scatter3d(
            x=customer_data['age'],
            y=customer_data['income'],
            z=customer_data['total_spent'],
            mode='markers',
            marker=dict(
                size=5,
                color=customer_data['orders'],
                colorscale='Viridis',
                showscale=True,
                colorbar=dict(title="Orders")
            ),
            text=[f"Customer {i}<br>Age: {age}<br>Income: {inc:,.0f}€<br>Revenue: {spent:,.0f}€" 
                   for i, (age, inc, spent) in enumerate(zip(customer_data['age'], 
                                                          customer_data['income'], 
                                                          customer_data['total_spent']))],
            hovertemplate='%{text}<extra></extra>'
        )])
        
        fig3.update_layout(
            title='3D Customer Analysis (Age, Income, Revenue)',
            scene=dict(
                xaxis_title='Age',
                yaxis_title='Income (€)',
                zaxis_title='Revenue (€)'
            ),
            height=600
        )
        
        fig3.show()
        
        return fig1, fig2, fig3
    
    def create_dashboard_summary(self, revenue_data, customer_data, product_data, geo_data):
        """Create summary dashboard"""
        print("Creating summary dashboard...")
        
        # Calculate key metrics
        total_revenue = revenue_data['revenue'].sum()
        avg_daily_revenue = revenue_data['revenue'].mean()
        total_customers = len(customer_data)
        avg_customer_value = customer_data['total_spent'].mean()
        total_orders = customer_data['orders'].sum()
        top_product = product_data.groupby('product').apply(
            lambda x: (x['price'] * x['quantity_sold']).sum()
        ).idxmax()
        
        # Create dashboard
        fig = plt.figure(figsize=(20, 16))
        
        # Title
        fig.suptitle('Data Science Analytics Dashboard', fontsize=20, fontweight='bold')
        
        # Key metrics
        metrics_text = f"""
        KEY METRICS
        ─────────────────────
        Total Revenue: €{total_revenue:,.0f}
        Ø Daily Revenue: €{avg_daily_revenue:,.0f}
        Total Customers: {total_customers:,}
        Ø Customer Value: €{avg_customer_value:,.0f}
        Total Orders: {total_orders:,}
        Top Product: {top_product}
        """
        
        plt.figtext(0.02, 0.95, metrics_text, fontsize=12, fontfamily='monospace',
                   verticalalignment='top', bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
        
        # Subplots for various analyses
        gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3, 
                           left=0.15, right=0.95, top=0.85, bottom=0.05)
        
        # 1. Revenue trend (top left)
        ax1 = fig.add_subplot(gs[0, 0])
        monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
        ax1.plot(monthly_revenue.index, monthly_revenue.values, 
                color=self.colors['primary'], linewidth=2)
        ax1.set_title('Monthly Revenue Trend')
        ax1.tick_params(axis='x', rotation=45)
        
        # 2. Customer segments (top center)
        ax2 = fig.add_subplot(gs[0, 1])
        segment_counts = customer_data['segment'].value_counts()
        ax2.pie(segment_counts.values, labels=segment_counts.index, autopct='%1.1f%%',
               colors=[self.colors['primary'], self.colors['secondary'], 
                      self.colors['accent'], self.colors['success']])
        ax2.set_title('Customer Segments')
        
        # 3. Top products (top right)
        ax3 = fig.add_subplot(gs[0, 2])
        product_revenue = product_data.groupby('product').apply(
            lambda x: (x['price'] * x['quantity_sold']).sum()
        ).sort_values(ascending=False).head(5)
        
        bars = ax3.barh(range(len(product_revenue)), product_revenue.values, 
                       color=self.colors['info'])
        ax3.set_yticks(range(len(product_revenue)))
        ax3.set_yticklabels(product_revenue.index)
        ax3.set_title('Top 5 Products')
        ax3.set_xlabel('Revenue (€)')
        
        # 4. Age distribution (center left)
        ax4 = fig.add_subplot(gs[1, 0])
        ax4.hist(customer_data['age'], bins=15, color=self.colors['secondary'], alpha=0.7)
        ax4.set_title('Age Distribution')
        ax4.set_xlabel('Age')
        
        # 5. Regional distribution (center center)
        ax5 = fig.add_subplot(gs[1, 1])
        region_revenue = geo_data.groupby('region')['revenue'].sum()
        ax5.bar(region_revenue.index, region_revenue.values, 
               color=[self.colors['primary'], self.colors['secondary'], 
                      self.colors['accent'], self.colors['success'], self.colors['info']])
        ax5.set_title('Revenue by Region')
        ax5.set_ylabel('Revenue (€)')
        
        # 6. Rating vs. Price (center right)
        ax6 = fig.add_subplot(gs[1, 2])
        ax6.scatter(product_data['price'], product_data['rating'], 
                  alpha=0.6, color=self.colors['accent'])
        ax6.set_title('Rating vs. Price')
        ax6.set_xlabel('Price (€)')
        ax6.set_ylabel('Rating')
        
        # 7. Revenue distribution (bottom left-center)
        ax7 = fig.add_subplot(gs[2, :2])
        ax7.hist(revenue_data['revenue'], bins=30, color=self.colors['primary'], 
                alpha=0.7, edgecolor='black')
        ax7.set_title('Daily Revenue Distribution')
        ax7.set_xlabel('Revenue (€)')
        ax7.set_ylabel('Frequency')
        ax7.axvline(revenue_data['revenue'].mean(), color=self.colors['accent'], 
                   linestyle='--', linewidth=2, label=f'Mean: €{revenue_data["revenue"].mean():.0f}')
        ax7.legend()
        
        # 8. Correlation heatmap (bottom right)
        ax8 = fig.add_subplot(gs[2, 2])
        
        # Create correlation matrix
        corr_data = customer_data[['age', 'income', 'total_spent', 'orders']].corr()
        im = ax8.imshow(corr_data, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
        
        # Color legend
## Data Science Workflow

### Typical Data Science Process
```mermaid
graph TD
    A[Problem Definition] --> B[Data Collection]
    B --> C[Data Cleaning]
    C --> D[Exploratory Analysis]
    D --> E[Feature Engineering]
    E --> F[Model Building]
    F --> G[Model Evaluation]
    G --> H[Deployment]
    H --> I[Monitoring]
    I --> A

CRISP-DM Methodology

  1. Business Understanding: Understand business objectives
  2. Data Understanding: Explore data
  3. Data Preparation: Clean and prepare data
  4. Modeling: Develop models
  5. Evaluation: Evaluate models
  6. Deployment: Deploy models
  7. Monitoring: Monitor performance

Big Data Technologies

Hadoop Ecosystem

ComponentFunctionAlternatives
HDFSDistributed StorageAmazon S3, Google Cloud Storage
MapReduceBatch ProcessingApache Spark, Apache Flink
HiveSQL on HadoopPresto, Apache Impala
HBaseNoSQL DatabaseCassandra, MongoDB
ZookeeperCoordinationConsul, etcd

Spark vs. MapReduce

FeatureSparkMapReduce
SpeedIn-MemoryDisk-based
Ease of UseHigh APIsLow-level
StreamingNativeBatch only
ML SupportMLlibExternal
Resource UsageHigherLower

Machine Learning Pipeline Components

Data Pipeline Stages

# ML Pipeline example with Scikit-learn
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

# Preprocessing
numeric_features = ['age', 'income', 'orders']
categorical_features = ['segment', 'region']

numeric_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# Complete Pipeline
ml_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', RandomForestRegressor(n_estimators=100))
])

MLOps Practices

  • Version Control: Git for code, DVC for data
  • Experiment Tracking: MLflow, Weights & Biases
  • Model Registry: MLflow Model Registry
  • CI/CD: GitHub Actions, Jenkins
  • Monitoring: Prometheus, Grafana
  • Drift Detection: Evidently AI, NannyML

Data Visualization Principles

Chart Selection Guide

Data TypeVisualizationPurpose
Time SeriesLine ChartTrends over time
CategoricalBar ChartComparisons
DistributionHistogramFrequencies
RelationshipScatter PlotCorrelations
CompositionPie ChartProportions
GeographicMapLocations

Color Theory for Data Viz

  • Primary Colors: Blue, Red, Yellow
  • Secondary Colors: Green, Orange, Purple
  • Color Psychology: Blue=Trust, Red=Danger, Green=Success
  • Contrast: Light/Dark for readability
  • Color Blindness: 8-10% of population

ETL Best Practices

Data Quality Checks

def validate_data_quality(df):
    """Validate data quality"""
    quality_report = {
        'total_rows': len(df),
        'missing_values': df.isnull().sum().to_dict(),
        'duplicate_rows': df.duplicated().sum(),
        'data_types': df.dtypes.to_dict(),
        'outliers': detect_outliers(df)
    }
    return quality_report

def detect_outliers(df, column):
    """Detect outliers using IQR method"""
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
    return len(outliers)

Performance Optimization

  • Parallel Processing: Multiprocessing, Dask
  • Memory Management: Chunking, Generators
  • Indexing: Database Indexes
  • Caching: Redis, Memcached
  • Batch Processing: Bulk Operations

Analytics Techniques

Statistical Methods

  • Descriptive Statistics: Mean, Median, Mode, Std Dev
  • Inferential Statistics: Hypothesis Testing, Confidence Intervals
  • Correlation Analysis: Pearson, Spearman, Kendall
  • Regression Analysis: Linear, Logistic, Polynomial
  • Time Series: ARIMA, Prophet, LSTM

Advanced Analytics

  • Clustering: K-Means, Hierarchical, DBSCAN
  • Classification: Decision Trees, Random Forest, SVM
  • Anomaly Detection: Isolation Forest, One-Class SVM
  • Dimensionality Reduction: PCA, t-SNE, UMAP
  • Association Rules: Apriori, FP-Growth

Advantages and Disadvantages

Advantages of Data Science

  • Data-Driven Decisions: Better business decisions
  • Pattern Recognition: Discover hidden patterns
  • Predictive Analytics: Forecast future trends
  • Process Optimization: Increase efficiency
  • Competitive Advantage: Gain competitive edge

Disadvantages

  • Data Quality: Dependent on data quality
  • Complexity: Complex algorithms and tools
  • Privacy Concerns: Data protection and ethics
  • Resource Intensive: Computing power and storage
  • Interpretability: Black-box models

Common Exam Questions

  1. What is the difference between Big Data and traditional data? Big Data is characterized by the 3Vs (Volume, Velocity, Variety) and requires specialized technologies for processing.

  2. Explain the ETL process! ETL stands for Extract (extract data), Transform (clean and transform data), and Load (load data into target system).

  3. When do you use which chart type? Time series for trends, bar charts for comparisons, histograms for distributions, scatter plots for relationships.

  4. What is the purpose of Machine Learning Pipelines? ML pipelines automate and standardize the entire workflow from data preparation to model deployment.

Key Sources

  1. https://spark.apache.org/
  2. https://airflow.apache.org/
  3. https://matplotlib.org/
  4. https://plotly.com/python/
Back to Blog
Share: