Skip to content
IRC-Coding IRC-Coding
Data Science Grundlagen Big Data Analytics ML Pipelines ETL Prozesse Data Visualization

Data Science Grundlagen: Big Data Analytics, ML-Pipelines, ETL & Visualization

Data Science Grundlagen mit Big Data Analytics, Machine Learning Pipelines, ETL-Prozessen und Data Visualization. Praktische Beispiele mit Python, Pandas, Scikit-learn und Tools.

S

schutzgeist

2 min read

Data Science Grundlagen: Big Data Analytics, ML-Pipelines, ETL & Visualization

Dieser Beitrag ist eine umfassende Einführung in die Data Science Grundlagen – inklusive Big Data Analytics, Machine Learning Pipelines, ETL-Prozessen und Data Visualization mit praktischen Beispielen.

In a Nutshell

Data Science kombiniert Statistik, Programmierung und Domänenwissen um aus Daten Erkenntnisse zu gewinnen. Big Data verarbeitet große Datenmengen, ML-Pipelines automatisieren Modelle, ETL bereitet Daten auf, Visualization macht Ergebnisse sichtbar.

Kompakte Fachbeschreibung

Data Science ist ein interdisziplinäres Feld, das wissenschaftliche Methoden, Prozesse, Algorithmen und Systeme verwendet um Wissen und Erkenntnisse aus strukturierten und unstrukturierten Daten zu extrahieren.

Kernbereiche:

Big Data Analytics

  • Konzept: Verarbeitung extrem großer Datenmengen
  • 3V-Modell: Volume, Velocity, Variety
  • Technologien: Hadoop, Spark, NoSQL-Datenbanken
  • Anwendungen: Echtzeitanalyse, Predictive Analytics

Machine Learning Pipelines

  • Konzept: Automatisierte ML-Workflow-Orchestrierung
  • Phasen: Data Collection → Preprocessing → Training → Evaluation → Deployment
  • Tools: Scikit-learn, TensorFlow, MLflow, Airflow
  • MLOps: Versionierung, Monitoring, Retraining

ETL-Prozesse

  • Extract: Daten aus verschiedenen Quellen extrahieren
  • Transform: Daten bereinigen und transformieren
  • Load: Daten in Zielsysteme laden
  • Tools: Apache NiFi, Talend, AWS Glue

Data Visualization

  • Konzept: Visuelle Darstellung von Daten und Erkenntnissen
  • Chart-Typen: Bar, Line, Scatter, Heatmap, Treemap
  • Tools: Matplotlib, Seaborn, Plotly, Tableau
  • Prinzipien: Klarheit, Genauigkeit, Effizienz

Prüfungsrelevante Stichpunkte

  • Data Science: Interdisziplinäres Feld für Datenanalyse
  • Big Data: Große, schnelle, vielfältige Datenmengen
  • Machine Learning: Automatisierte Mustererkennung in Daten
  • ETL: Extract-Transform-Load für Datenaufbereitung
  • Visualization: Visuelle Datenrepräsentation
  • Pipelines: Automatisierte Datenverarbeitungsprozesse
  • Analytics: Statistische und explorative Datenanalyse
  • IHK-relevant: Moderne Datenanalyse und Business Intelligence

Kernkomponenten

  1. Data Collection: Datenquellen und -sammlung
  2. Data Storage: Speicherung und Organisation
  3. Data Processing: Bereinigung und Transformation
  4. Data Analysis: Statistische und explorative Analyse
  5. Machine Learning: Modelle und Algorithmen
  6. Data Visualization: Visuelle Darstellung
  7. Model Deployment: Produktionseinsatz
  8. Monitoring: Performance und Drift-Erkennung

Praxisbeispiele

1. Big Data Processing mit Apache Spark und Python

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

# Big Data Processing Demo
class BigDataAnalytics:
    
    def __init__(self, app_name="BigDataAnalytics"):
        """Spark Session initialisieren"""
        self.spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
            .getOrCreate()
        
        print(f"Spark Session erstellt: {self.spark.version}")
    
    def create_sample_data(self):
        """Beispieldaten für Big Data Demo erstellen"""
        import random
        from datetime import datetime, timedelta
        
        # Simulierte Verkaufsdaten
        data = []
        products = ["Laptop", "Smartphone", "Tablet", "Headphones", "Mouse", "Keyboard"]
        regions = ["North", "South", "East", "West", "Central"]
        
        start_date = datetime(2023, 1, 1)
        
        for i in range(1000000):  # 1M Datensätze
            date = start_date + timedelta(days=random.randint(0, 365))
            record = {
                "transaction_id": f"T{i:06d}",
                "date": date.strftime("%Y-%m-%d"),
                "product": random.choice(products),
                "region": random.choice(regions),
                "quantity": random.randint(1, 10),
                "unit_price": round(random.uniform(10, 1000), 2),
                "customer_id": f"C{random.randint(1, 50000):05d}",
                "sales_rep": f"SR{random.randint(1, 100):03d}"
            }
            record["total_amount"] = record["quantity"] * record["unit_price"]
            data.append(record)
        
        # Spark DataFrame erstellen
        schema = StructType([
            StructField("transaction_id", StringType(), True),
            StructField("date", StringType(), True),
            StructField("product", StringType(), True),
            StructField("region", StringType(), True),
            StructField("quantity", IntegerType(), True),
            StructField("unit_price", DoubleType(), True),
            StructField("customer_id", StringType(), True),
            StructField("sales_rep", StringType(), True),
            StructField("total_amount", DoubleType(), True)
        ])
        
        df = self.spark.createDataFrame(data, schema)
        print(f"Beispieldaten erstellt: {df.count()} Zeilen")
        
        return df
    
    def basic_analytics(self, df):
        """Grundlegende Analytics durchführen"""
        print("=== Grundlegende Analytics ===")
        
        # Schema anzeigen
        print("Schema:")
        df.printSchema()
        
        # Statistische Zusammenfassung
        print("\nStatistische Zusammenfassung:")
        df.describe().show()
        
        # Top-Produkte nach Umsatz
        print("\nTop-Produkte nach Umsatz:")
        product_sales = df.groupBy("product") \
            .agg(sum("total_amount").alias("total_sales"),
                 count("transaction_id").alias("transaction_count")) \
            .orderBy(col("total_sales").desc())
        
        product_sales.show()
        
        # Regionale Analyse
        print("\nRegionale Umsatzanalyse:")
        regional_sales = df.groupBy("region") \
            .agg(sum("total_amount").alias("total_sales"),
                 avg("total_amount").alias("avg_transaction"),
                 count("transaction_id").alias("transaction_count")) \
            .orderBy(col("total_sales").desc())
        
        regional_sales.show()
        
        # Zeitliche Analyse
        print("\nMonatliche Umsatzentwicklung:")
        monthly_sales = df.withColumn("month", substring(col("date"), 1, 7)) \
            .groupBy("month") \
            .agg(sum("total_amount").alias("monthly_sales"),
                 count("transaction_id").alias("transaction_count")) \
            .orderBy("month")
        
        monthly_sales.show()
        
        return product_sales, regional_sales, monthly_sales
    
    def advanced_analytics(self, df):
        """Fortgeschrittene Analytics mit Window Functions"""
        print("\n=== Fortgeschrittene Analytics ===")
        
        # Window Functions für laufende Summen
        from pyspark.sql.window import Window
        
        # Monatliche kumulative Umsätze
        window_spec = Window.partitionBy("month").orderBy("date") \
            .rowsBetween(Window.unboundedPreceding, Window.currentRow)
        
        monthly_cumulative = df.withColumn("month", substring(col("date"), 1, 7)) \
            .groupBy("month", "date") \
            .agg(sum("total_amount").alias("daily_sales")) \
            .withColumn("cumulative_sales", sum("daily_sales").over(window_spec)) \
            .orderBy("month", "date")
        
        print("Monatliche kumulative Umsätze:")
        monthly_cumulative.show(20)
        
        # Best-Performing Sales Reps
        rep_performance = df.groupBy("sales_rep") \
            .agg(sum("total_amount").alias("total_sales"),
                 count("transaction_id").alias("transaction_count"),
                 avg("total_amount").alias("avg_transaction")) \
            .withColumn("performance_rank", percent_rank().over(Window.orderBy(col("total_sales").desc()))) \
            .filter(col("performance_rank") <= 0.1)  # Top 10%
        
        print("\nTop 10% Sales Reps:")
        rep_performance.show()
        
        # Produkt-Korrelationen
        product_correlation = df.groupBy("customer_id", "product") \
            .agg(sum("quantity").alias("product_quantity")) \
            .groupBy("customer_id") \
            .pivot("product") \
            .agg(sum("product_quantity")) \
            .fillna(0)
        
        print("\nProdukt-Korrelationsmatrix (Top 10 Kunden):")
        product_correlation.limit(10).show()
        
        return monthly_cumulative, rep_performance, product_correlation
    
    def machine_learning_pipeline(self, df):
        """Machine Learning Pipeline für Umsatzvorhersage"""
        print("\n=== Machine Learning Pipeline ===")
        
        # Features für ML vorbereiten
        feature_data = df.groupBy("product", "region", "month") \
            .agg(sum("total_amount").alias("target_sales"),
                 count("transaction_id").alias("transaction_count"),
                 avg("unit_price").alias("avg_unit_price"),
                 sum("quantity").alias("total_quantity"))
        
        # Kategoriale Features encodieren
        from pyspark.ml.feature import StringIndexer, OneHotEncoder
        
        # Product Indexer
        product_indexer = StringIndexer(inputCol="product", outputCol="product_index")
        product_indexed = product_indexer.fit(feature_data).transform(feature_data)
        
        # Region Indexer
        region_indexer = StringIndexer(inputCol="region", outputCol="region_index")
        region_indexed = region_indexer.fit(product_indexed).transform(product_indexed)
        
        # One-Hot Encoding
        encoder = OneHotEncoder(inputCols=["product_index", "region_index"],
                               outputCols=["product_encoded", "region_encoded"])
        encoded_data = encoder.fit(region_indexed).transform(region_indexed)
        
        # Feature Vector erstellen
        assembler = VectorAssembler(
            inputCols=["transaction_count", "avg_unit_price", "total_quantity",
                       "product_encoded", "region_encoded"],
            outputCol="features"
        )
        
        assembled_data = assembler.transform(encoded_data)
        
        # Features skalieren
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
        scaler_model = scaler.fit(assembled_data)
        scaled_data = scaler_model.transform(assembled_data)
        
        # Train-Test Split
        train_data, test_data = scaled_data.randomSplit([0.8, 0.2], seed=42)
        
        print(f"Trainingsdaten: {train_data.count()} Zeilen")
        print(f"Testdaten: {test_data.count()} Zeilen")
        
        # Lineare Regression
        lr = LinearRegression(featuresCol="scaled_features", 
                            labelCol="target_sales",
                            predictionCol="predicted_sales")
        
        lr_model = lr.fit(train_data)
        
        # Vorhersagen auf Testdaten
        predictions = lr_model.transform(test_data)
        
        # Modell evaluieren
        evaluator = RegressionEvaluator(labelCol="target_sales",
                                     predictionCol="predicted_sales",
                                     metricName="rmse")
        
        rmse = evaluator.evaluate(predictions)
        r2 = evaluator.setMetricName("r2").evaluate(predictions)
        
        print(f"Modell-Evaluation:")
        print(f"RMSE: {rmse:.2f}")
        print(f"R²: {r2:.4f}")
        
        # Feature Importance anzeigen
        print(f"\nFeature Coefficients:")
        feature_names = ["transaction_count", "avg_unit_price", "total_quantity",
                       "product_encoded", "region_encoded"]
        
        for i, (name, coef) in enumerate(zip(feature_names, lr_model.coefficients)):
            print(f"{name}: {coef:.4f}")
        
        return lr_model, predictions, scaler_model
    
    def real_time_processing_simulation(self, df):
        """Echtzeitverarbeitung simulieren"""
        print("\n=== Echtzeitverarbeitung Simulation ===")
        
        # Streaming DataFrame simulieren
        # In der Praxis würde dies von Kafka, Kinesis etc. kommen
        
        # Aggregationen für Echtzeit-Monitoring
        real_time_metrics = df.groupBy("product", "region") \
            .agg(count("transaction_id").alias("transaction_count"),
                 sum("total_amount").alias("total_sales"),
                 avg("total_amount").alias("avg_transaction")) \
            .orderBy(col("total_sales").desc())
        
        print("Echtzeit-Metriken:")
        real_time_metrics.show()
        
        # Anomalie-Erkennung (vereinfacht)
        from pyspark.sql.functions import stddev
        
        # Statistische Grenzen für Anomalie-Erkennung
        stats = df.groupBy("product") \
            .agg(avg("total_amount").alias("avg_amount"),
                 stddev("total_amount").alias("std_amount")) \
            .withColumn("upper_bound", col("avg_amount") + 2 * col("std_amount")) \
            .withColumn("lower_bound", col("avg_amount") - 2 * col("std_amount"))
        
        print("Anomalie-Grenzen pro Produkt:")
        stats.show()
        
        return real_time_metrics, stats
    
    def data_export(self, df, predictions):
        """Daten für weitere Verarbeitung exportieren"""
        print("\n=== Daten Export ===")
        
        # Ergebnisse in verschiedene Formate exportieren
        output_path = "/tmp/big_data_results"
        
        # Analytics Ergebnisse
        product_sales = df.groupBy("product") \
            .agg(sum("total_amount").alias("total_sales")) \
            .orderBy(col("total_sales").desc())
        
        product_sales.write.mode("overwrite").parquet(f"{output_path}/product_sales")
        print(f"Produkt-Umsätze exportiert nach: {output_path}/product_sales")
        
        # ML-Vorhersagen
        predictions.select("product", "region", "target_sales", "predicted_sales") \
            .write.mode("overwrite").parquet(f"{output_path}/predictions")
        
        print(f"ML-Vorhersagen exportiert nach: {output_path}/predictions")
        
        # Zusammenfassende Statistiken
        summary_stats = df.agg(
            count("transaction_id").alias("total_transactions"),
            sum("total_amount").alias("total_revenue"),
            avg("total_amount").alias("avg_transaction_value"),
            countDistinct("customer_id").alias("unique_customers"),
            countDistinct("product").alias("unique_products")
        )
        
        summary_stats.write.mode("overwrite").json(f"{output_path}/summary_stats")
        print(f"Zusammenfassende Statistiken exportiert nach: {output_path}/summary_stats")
    
    def cleanup(self):
        """Ressourcen freigeben"""
        self.spark.stop()
        print("Spark Session beendet")

# Big Data Demo ausführen
def big_data_demo():
    analytics = BigDataAnalytics()
    
    try:
        # Daten erstellen
        df = analytics.create_sample_data()
        
        # Grundlegende Analytics
        product_sales, regional_sales, monthly_sales = analytics.basic_analytics(df)
        
        # Fortgeschrittene Analytics
        monthly_cumulative, rep_performance, product_correlation = analytics.advanced_analytics(df)
        
        # Machine Learning Pipeline
        lr_model, predictions, scaler_model = analytics.machine_learning_pipeline(df)
        
        # Echtzeitverarbeitung
        real_time_metrics, stats = analytics.real_time_processing_simulation(df)
        
        # Daten exportieren
        analytics.data_export(df, predictions)
        
        print("\nBig Data Analytics Demo erfolgreich abgeschlossen!")
        
    except Exception as e:
        print(f"Fehler in Big Data Demo: {e}")
    finally:
        analytics.cleanup()

if __name__ == "__main__":
    big_data_demo()

2. ETL Pipeline mit Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3UploadOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.models import Variable
from airflow.utils.dates import days_ago
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging

# ETL Pipeline Configuration
default_args = {
    'owner': 'data-science-team',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG Definition
dag = DAG(
    'data_science_etl_pipeline',
    default_args=default_args,
    description='ETL Pipeline für Data Science Analytics',
    schedule_interval='@daily',
    catchup=False,
    tags=['data-science', 'etl', 'analytics'],
)

# Logging konfigurieren
logger = logging.getLogger(__name__)

def extract_customer_data(**context):
    """Kundendaten aus verschiedenen Quellen extrahieren"""
    logger.info("Extrahiere Kundendaten...")
    
    # PostgreSQL Hook
    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Kundendaten aus PostgreSQL extrahieren
    customer_query = """
        SELECT 
            customer_id,
            first_name,
            last_name,
            email,
            phone,
            registration_date,
            last_login_date,
            total_orders,
            total_spent,
            customer_segment
        FROM customers 
        WHERE last_login_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
    """
    
    customers_df = postgres_hook.get_pandas_df(customer_query)
    logger.info(f"{len(customers_df)} Kundendatensätze extrahiert")
    
    # S3 Hook für zusätzliche Daten
    s3_hook = S3Hook(aws_conn_id='aws_default')
    
    # Produkt-Interaktionsdaten aus S3
    bucket_name = Variable.get('data_bucket_name', default_var='analytics-data')
    
    try:
        # CSV-Datei aus S3 laden
        file_content = s3_hook.read_key(key='customer_interactions.csv', bucket_name=bucket_name)
        interactions_df = pd.read_csv(pd.StringIO(file_content))
        logger.info(f"{len(interactions_df)} Interaktionsdatensätze aus S3 geladen")
    except Exception as e:
        logger.warning(f"Keine Interaktionsdaten in S3 gefunden: {e}")
        interactions_df = pd.DataFrame()
    
    # Daten kombinieren
    if not interactions_df.empty:
        combined_df = customers_df.merge(
            interactions_df, 
            on='customer_id', 
            how='left'
        )
    else:
        combined_df = customers_df
    
    # Daten als JSON speichern für nächste Task
    combined_json = combined_df.to_json(date_format='iso')
    
    # In XCom speichern
    task_instance = context['task_instance']
    task_instance.xcom_push(key='customer_data', value=combined_json)
    
    logger.info("Kundendaten-Extraktion abgeschlossen")
    return combined_json

def transform_customer_data(**context):
    """Kundendaten transformieren und bereinigen"""
    logger.info("Transformiere Kundendaten...")
    
    # Daten aus vorheriger Task holen
    task_instance = context['task_instance']
    customer_json = task_instance.xcom_pull(task_ids='extract_customer_data', key='customer_data')
    
    # JSON zu DataFrame
    customers_df = pd.read_json(customer_json)
    
    # Data Cleaning
    logger.info("Führe Data Cleaning durch...")
    
    # 1. Duplikate entfernen
    original_count = len(customers_df)
    customers_df = customers_df.drop_duplicates(subset=['customer_id'])
    duplicates_removed = original_count - len(customers_df)
    logger.info(f"{duplicates_removed} Duplikate entfernt")
    
    # 2. Fehlende Werte behandeln
    # Numerische Spalten: Median imputieren
    numeric_columns = ['total_orders', 'total_spent']
    for col in numeric_columns:
        if col in customers_df.columns:
            median_value = customers_df[col].median()
            customers_df[col].fillna(median_value, inplace=True)
    
    # Kategoriale Spalten: Mode imputieren
    categorical_columns = ['customer_segment']
    for col in categorical_columns:
        if col in customers_df.columns:
            mode_value = customers_df[col].mode()[0] if not customers_df[col].mode().empty else 'Unknown'
            customers_df[col].fillna(mode_value, inplace=True)
    
    # 3. Datums-Spalten normalisieren
    date_columns = ['registration_date', 'last_login_date']
    for col in date_columns:
        if col in customers_df.columns:
            customers_df[col] = pd.to_datetime(customers_df[col], errors='coerce')
    
    # 4. Feature Engineering
    logger.info("Erstelle neue Features...")
    
    # Customer Lifetime Value (CLV) berechnen
    if 'total_spent' in customers_df.columns and 'total_orders' in customers_df.columns:
        customers_df['avg_order_value'] = customers_df['total_spent'] / customers_df['total_orders']
        customers_df['clv_score'] = customers_df['total_spent'] * np.log1p(customers_df['total_orders'])
    
    # Recency Score (Tage seit letztem Login)
    if 'last_login_date' in customers_df.columns:
        current_date = datetime.now()
        customers_df['days_since_last_login'] = (current_date - customers_df['last_login_date']).dt.days
        customers_df['recency_score'] = pd.cut(customers_df['days_since_last_login'], 
                                              bins=[0, 7, 30, 90, float('inf')], 
                                              labels=[4, 3, 2, 1])
    
    # Engagement Score basierend auf verschiedenen Faktoren
    engagement_features = []
    if 'total_orders' in customers_df.columns:
        engagement_features.append('total_orders')
    if 'days_since_last_login' in customers_df.columns:
        engagement_features.append('days_since_last_login')
    
    if engagement_features:
        # Normalisierte Features für Engagement Score
        for feature in engagement_features:
            if feature in customers_df.columns:
                min_val = customers_df[feature].min()
                max_val = customers_df[feature].max()
                if max_val != min_val:
                    customers_df[f'{feature}_normalized'] = (customers_df[feature] - min_val) / (max_val - min_val)
                else:
                    customers_df[f'{feature}_normalized'] = 0
        
        # Engagement Score berechnen
        normalized_features = [f'{f}_normalized' for f in engagement_features]
        customers_df['engagement_score'] = customers_df[normalized_features].mean(axis=1)
    
    # 5. Datenvalidierung
    logger.info("Führe Datenvalidierung durch...")
    
    validation_errors = []
    
    # Business Rules prüfen
    if 'total_orders' in customers_df.columns:
        if (customers_df['total_orders'] < 0).any():
            validation_errors.append("Negative Bestellanzahlen gefunden")
    
    if 'total_spent' in customers_df.columns:
        if (customers_df['total_spent'] < 0).any():
            validation_errors.append("Negative Umsätze gefunden")
    
    if 'email' in customers_df.columns:
        invalid_emails = customers_df[~customers_df['email'].str.contains('@', na=False)]
        if not invalid_emails.empty:
            validation_errors.append(f"{len(invalid_emails)} ungültige E-Mail-Adressen gefunden")
    
    if validation_errors:
        logger.warning(f"Validierungsfehler: {validation_errors}")
    
    # 6. Daten aggregieren für Analytics
    logger.info("Aggregiere Daten für Analytics...")
    
    # Kunden-Segment Statistiken
    if 'customer_segment' in customers_df.columns:
        segment_stats = customers_df.groupby('customer_segment').agg({
            'customer_id': 'count',
            'total_spent': ['sum', 'mean'],
            'total_orders': ['sum', 'mean'],
            'engagement_score': 'mean'
        }).round(2)
        
        logger.info("Kunden-Segment Statistiken erstellt")
    
    # Transformierte Daten speichern
    transformed_json = customers_df.to_json(date_format='iso')
    task_instance.xcom_push(key='transformed_data', value=transformed_json)
    
    logger.info(f"Datentransformation abgeschlossen: {len(customers_df)} Datensätze")
    return transformed_json

def load_data_to_warehouse(**context):
    """Transformierte Daten in Data Warehouse laden"""
    logger.info("Lade Daten in Data Warehouse...")
    
    # Transformierte Daten holen
    task_instance = context['task_instance']
    transformed_json = task_instance.xcom_pull(task_ids='transform_customer_data', key='transformed_data')
    
    # JSON zu DataFrame
    customers_df = pd.read_json(transformed_json)
    
    # PostgreSQL Hook
    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Tabelle erstellen falls nicht existent
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS analytics_customer_metrics (
        customer_id VARCHAR(50) PRIMARY KEY,
        first_name VARCHAR(100),
        last_name VARCHAR(100),
        email VARCHAR(255),
        phone VARCHAR(50),
        registration_date TIMESTAMP,
        last_login_date TIMESTAMP,
        total_orders INTEGER,
        total_spent DECIMAL(10,2),
        customer_segment VARCHAR(50),
        avg_order_value DECIMAL(10,2),
        clv_score DECIMAL(10,2),
        days_since_last_login INTEGER,
        recency_score INTEGER,
        engagement_score DECIMAL(5,2),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """
    
    postgres_hook.run(create_table_sql)
    
    # Upsert Operation (Insert oder Update)
    upsert_sql = """
    INSERT INTO analytics_customer_metrics (
        customer_id, first_name, last_name, email, phone, 
        registration_date, last_login_date, total_orders, total_spent,
        customer_segment, avg_order_value, clv_score, days_since_last_login,
        recency_score, engagement_score
    ) VALUES (
        %(customer_id)s, %(first_name)s, %(last_name)s, %(email)s, %(phone)s,
        %(registration_date)s, %(last_login_date)s, %(total_orders)s, %(total_spent)s,
        %(customer_segment)s, %(avg_order_value)s, %(clv_score)s, %(days_since_last_login)s,
        %(recency_score)s, %(engagement_score)s
    )
    ON CONFLICT (customer_id) 
    DO UPDATE SET
        first_name = EXCLUDED.first_name,
        last_name = EXCLUDED.last_name,
        email = EXCLUDED.email,
        phone = EXCLUDED.phone,
        registration_date = EXCLUDED.registration_date,
        last_login_date = EXCLUDED.last_login_date,
        total_orders = EXCLUDED.total_orders,
        total_spent = EXCLUDED.total_spent,
        customer_segment = EXCLUDED.customer_segment,
        avg_order_value = EXCLUDED.avg_order_value,
        clv_score = EXCLUDED.clv_score,
        days_since_last_login = EXCLUDED.days_since_last_login,
        recency_score = EXCLUDED.recency_score,
        engagement_score = EXCLUDED.engagement_score,
        updated_at = CURRENT_TIMESTAMP;
    """
    
    # Daten in Batches laden (Performance-Optimierung)
    batch_size = 1000
    total_rows = len(customers_df)
    
    for i in range(0, total_rows, batch_size):
        batch_df = customers_df.iloc[i:i+batch_size]
        
        # Daten für PostgreSQL vorbereiten
        batch_records = batch_df.to_dict('records')
        
        # NaN-Werte behandeln
        for record in batch_records:
            for key, value in record.items():
                if pd.isna(value):
                    record[key] = None
                elif isinstance(value, pd.Timestamp):
                    record[key] = value.isoformat()
        
        # Batch ausführen
        postgres_hook.run(upsert_sql, parameters=batch_records)
        
        logger.info(f"Batch {i//batch_size + 1}/{(total_rows-1)//batch_size + 1} geladen: {len(batch_df)} Zeilen")
    
    # BigQuery auch laden für Analytics
    try:
        bigquery_hook = BigQueryHook(gcp_conn_id='google_cloud_default', use_legacy_sql=False)
        
        # Dataset und Table konfigurieren
        project_id = Variable.get('gcp_project_id', default_var='my-project')
        dataset_id = 'analytics'
        table_id = 'customer_metrics'
        
        # DataFrame zu BigQuery laden
        customers_df.to_gbq(
            destination_table=f'{dataset_id}.{table_id}',
            project_id=project_id,
            if_exists='replace',
            progress_bar=False
        )
        
        logger.info("Daten erfolgreich nach BigQuery geladen")
        
    except Exception as e:
        logger.warning(f"BigQuery Load fehlgeschlagen: {e}")
    
    logger.info("Daten-Ladevorgang abgeschlossen")

def generate_analytics_report(**context):
    """Analytics Report generieren"""
    logger.info("Generiere Analytics Report...")
    
    # PostgreSQL Hook für finale Daten
    postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Analytics Queries ausführen
    analytics_queries = {
        'customer_segments': """
            SELECT 
                customer_segment,
                COUNT(*) as customer_count,
                AVG(total_spent) as avg_total_spent,
                AVG(total_orders) as avg_total_orders,
                AVG(clv_score) as avg_clv_score,
                AVG(engagement_score) as avg_engagement_score
            FROM analytics_customer_metrics
            GROUP BY customer_segment
            ORDER BY avg_total_spent DESC
        """,
        
        'top_customers': """
            SELECT 
                customer_id,
                first_name,
                last_name,
                total_spent,
                total_orders,
                clv_score,
                engagement_score
            FROM analytics_customer_metrics
            ORDER BY clv_score DESC
            LIMIT 10
        """,
        
        'engagement_distribution': """
            SELECT 
                CASE 
                    WHEN engagement_score >= 0.8 THEN 'High'
                    WHEN engagement_score >= 0.6 THEN 'Medium'
                    WHEN engagement_score >= 0.4 THEN 'Low'
                    ELSE 'Very Low'
                END as engagement_level,
                COUNT(*) as customer_count,
                AVG(total_spent) as avg_total_spent
            FROM analytics_customer_metrics
            GROUP BY engagement_level
            ORDER BY avg_total_spent DESC
        """,
        
        'daily_trends': """
            SELECT 
                DATE(last_login_date) as login_date,
                COUNT(*) as active_customers,
                AVG(total_spent) as avg_spend_per_customer
            FROM analytics_customer_metrics
            WHERE last_login_date >= CURRENT_DATE - INTERVAL 30 DAY
            GROUP BY DATE(last_login_date)
            ORDER BY login_date DESC
        """
    }
    
    # Ergebnisse sammeln
    analytics_results = {}
    
    for query_name, query in analytics_queries.items():
        try:
            result_df = postgres_hook.get_pandas_df(query)
            analytics_results[query_name] = result_df
            logger.info(f"Analytics Query '{query_name}' ausgeführt: {len(result_df)} Zeilen")
        except Exception as e:
            logger.error(f"Fehler bei Query '{query_name}': {e}")
            analytics_results[query_name] = pd.DataFrame()
    
    # Report erstellen
    report_data = {
        'generated_at': datetime.now().isoformat(),
        'total_customers': len(analytics_results.get('customer_segments', pd.DataFrame())),
        'analytics': {}
    }
    
    for query_name, result_df in analytics_results.items():
        if not result_df.empty:
            report_data['analytics'][query_name] = result_df.to_dict('records')
    
    # Report als JSON speichern
    report_json = json.dumps(report_data, indent=2, default=str)
    
    # Report in S3 speichern
    try:
        s3_hook = S3Hook(aws_conn_id='aws_default')
        bucket_name = Variable.get('reports_bucket_name', default_var='analytics-reports')
        
        report_filename = f"customer_analytics_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        s3_hook.load_string(
            report_json,
            key=report_filename,
            bucket_name=bucket_name,
            replace=True
        )
        
        logger.info(f"Analytics Report gespeichert: s3://{bucket_name}/{report_filename}")
        
    except Exception as e:
        logger.error(f"Fehler beim Speichern des Reports: {e}")
    
    # Report-Metriken loggen
    if 'customer_segments' in analytics_results:
        segments_df = analytics_results['customer_segments']
        if not segments_df.empty:
            logger.info("Kunden-Segment Verteilung:")
            for _, row in segments_df.iterrows():
                logger.info(f"  {row['customer_segment']}: {row['customer_count']} Kunden, "
                           f"Ø Umsatz: €{row['avg_total_spent']:.2f}")
    
    logger.info("Analytics Report generierung abgeschlossen")

# Tasks definieren
extract_task = PythonOperator(
    task_id='extract_customer_data',
    python_callable=extract_customer_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_customer_data',
    python_callable=transform_customer_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_data_to_warehouse',
    python_callable=load_data_to_warehouse,
    dag=dag,
)

report_task = PythonOperator(
    task_id='generate_analytics_report',
    python_callable=generate_analytics_report,
    dag=dag,
)

# Task Dependencies
extract_task >> transform_task >> load_task >> report_task

3. Data Visualization mit Matplotlib, Seaborn und Plotly

import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Data Visualization Demo
class DataVisualizationDemo:
    
    def __init__(self):
        """Stil und Konfiguration festlegen"""
        # Matplotlib Konfiguration
        plt.style.use('seaborn-v0_8')
        plt.rcParams['figure.figsize'] = (12, 8)
        plt.rcParams['font.size'] = 10
        plt.rcParams['axes.titlesize'] = 14
        plt.rcParams['axes.labelsize'] = 12
        plt.rcParams['xtick.labelsize'] = 10
        plt.rcParams['ytick.labelsize'] = 10
        plt.rcParams['legend.fontsize'] = 10
        plt.rcParams['figure.titlesize'] = 16
        
        # Seaborn Konfiguration
        sns.set_palette("husl")
        
        # Farben definieren
        self.colors = {
            'primary': '#2E86AB',
            'secondary': '#A23B72',
            'accent': '#F18F01',
            'success': '#C73E1D',
            'warning': '#F4A261',
            'info': '#264653',
            'light': '#E9C46A',
            'dark': '#2A9D8F'
        }
        
        print("Data Visualization Demo initialisiert")
    
    def create_sample_data(self):
        """Beispieldaten für Visualisierung erstellen"""
        np.random.seed(42)
        
        # Zeitreihendaten (Umsatzentwicklung)
        dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
        base_revenue = 10000
        trend = np.linspace(0, 5000, len(dates))
        seasonal = 2000 * np.sin(2 * np.pi * np.arange(len(dates)) / 365.25)
        noise = np.random.normal(0, 500, len(dates))
        
        revenue_data = pd.DataFrame({
            'date': dates,
            'revenue': base_revenue + trend + seasonal + noise,
            'orders': np.random.poisson(100, len(dates)) + 50,
            'customers': np.random.poisson(80, len(dates)) + 30
        })
        
        # Kundendaten
        customer_data = pd.DataFrame({
            'customer_id': range(1000),
            'age': np.random.normal(35, 10, 1000),
            'income': np.random.lognormal(10.5, 0.5, 1000),
            'total_spent': np.random.gamma(2, 100, 1000),
            'orders': np.random.poisson(5, 1000) + 1,
            'segment': np.random.choice(['Bronze', 'Silver', 'Gold', 'Platinum'], 1000, 
                                      p=[0.4, 0.3, 0.2, 0.1]),
            'registration_date': pd.date_range('2020-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)],
            'last_purchase': pd.date_range('2023-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)]
        })
        
        # Produktdaten
        products = ['Laptop', 'Smartphone', 'Tablet', 'Headphones', 'Mouse', 'Keyboard', 'Monitor', 'Webcam']
        product_data = pd.DataFrame({
            'product': np.random.choice(products, 5000),
            'category': np.random.choice(['Electronics', 'Accessories'], 5000),
            'price': np.random.uniform(10, 1000, 5000),
            'quantity_sold': np.random.poisson(10, 5000) + 1,
            'rating': np.random.uniform(3, 5, 5000),
            'month': np.random.choice(range(1, 13), 5000)
        })
        
        # Geografische Daten
        regions = ['North', 'South', 'East', 'West', 'Central']
        geo_data = pd.DataFrame({
            'region': np.random.choice(regions, 1000),
            'city': [f"{region} City {i}" for region, i in zip(
                np.random.choice(regions, 1000), 
                np.random.randint(1, 10, 1000)
            )],
            'population': np.random.lognormal(10, 1, 1000),
            'revenue': np.random.gamma(2, 50000, 1000),
            'stores': np.random.poisson(5, 1000) + 1
        })
        
        return revenue_data, customer_data, product_data, geo_data
    
    def create_time_series_visualizations(self, revenue_data):
        """Zeitreihen-Visualisierungen erstellen"""
        print("Erstelle Zeitreihen-Visualisierungen...")
        
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Umsatz-Analyse Zeitreihe', fontsize=16, fontweight='bold')
        
        # 1. Umsatzentwicklung
        axes[0, 0].plot(revenue_data['date'], revenue_data['revenue'], 
                       color=self.colors['primary'], linewidth=2)
        axes[0, 0].set_title('Tägliche Umsatzentwicklung')
        axes[0, 0].set_xlabel('Datum')
        axes[0, 0].set_ylabel('Umsatz (€)')
        axes[0, 0].grid(True, alpha=0.3)
        
        # Moving Average hinzufügen
        revenue_data['ma_7'] = revenue_data['revenue'].rolling(window=7).mean()
        revenue_data['ma_30'] = revenue_data['revenue'].rolling(window=30).mean()
        
        axes[0, 0].plot(revenue_data['date'], revenue_data['ma_7'], 
                       color=self.colors['accent'], linewidth=1, alpha=0.7, label='7-Tage MA')
        axes[0, 0].plot(revenue_data['date'], revenue_data['ma_30'], 
                       color=self.colors['secondary'], linewidth=1, alpha=0.7, label='30-Tage MA')
        axes[0, 0].legend()
        
        # 2. Monatsweise Umsatz
        monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
        axes[0, 1].bar(monthly_revenue.index, monthly_revenue.values, 
                      color=self.colors['info'], alpha=0.7)
        axes[0, 1].set_title('Monatlicher Gesamtumsatz')
        axes[0, 1].set_xlabel('Monat')
        axes[0, 1].set_ylabel('Umsatz (€)')
        axes[0, 1].tick_params(axis='x', rotation=45)
        
        # 3. Umsatz vs Bestellungen Scatter Plot
        axes[1, 0].scatter(revenue_data['orders'], revenue_data['revenue'], 
                          alpha=0.6, color=self.colors['primary'])
        axes[1, 0].set_title('Umsatz vs. Bestellungen')
        axes[1, 0].set_xlabel('Anzahl Bestellungen')
        axes[1, 0].set_ylabel('Umsatz (€)')
        
        # Trendlinie hinzufügen
        z = np.polyfit(revenue_data['orders'], revenue_data['revenue'], 1)
        p = np.poly1d(z)
        axes[1, 0].plot(revenue_data['orders'], p(revenue_data['orders']), 
                       color=self.colors['accent'], linewidth=2)
        
        # 4. Verteilung der täglichen Umsätze
        axes[1, 1].hist(revenue_data['revenue'], bins=30, color=self.colors['secondary'], 
                       alpha=0.7, edgecolor='black')
        axes[1, 1].set_title('Verteilung täglicher Umsätze')
        axes[1, 1].set_xlabel('Umsatz (€)')
        axes[1, 1].set_ylabel('Häufigkeit')
        axes[1, 1].axvline(revenue_data['revenue'].mean(), color=self.colors['accent'], 
                           linestyle='--', linewidth=2, label=f'Mittelwert: €{revenue_data["revenue"].mean():.0f}')
        axes[1, 1].legend()
        
        plt.tight_layout()
        plt.savefig('time_series_analysis.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_customer_analytics_visualizations(self, customer_data):
        """Kunden-Analytics Visualisierungen erstellen"""
        print("Erstelle Kunden-Analytics Visualisierungen...")
        
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        fig.suptitle('Kunden-Analytics Dashboard', fontsize=16, fontweight='bold')
        
        # 1. Altersverteilung
        axes[0, 0].hist(customer_data['age'], bins=20, color=self.colors['primary'], 
                       alpha=0.7, edgecolor='black')
        axes[0, 0].set_title('Altersverteilung')
        axes[0, 0].set_xlabel('Alter')
        axes[0, 0].set_ylabel('Anzahl')
        
        # 2. Einkommensverteilung
        axes[0, 1].hist(customer_data['income'], bins=30, color=self.colors['secondary'], 
                       alpha=0.7, edgecolor='black')
        axes[0, 1].set_title('Einkommensverteilung')
        axes[0, 1].set_xlabel('Einkommen (€)')
        axes[0, 1].set_ylabel('Anzahl')
        
        # 3. Kunden-Segment Verteilung
        segment_counts = customer_data['segment'].value_counts()
        axes[0, 2].pie(segment_counts.values, labels=segment_counts.index, 
                      autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'], 
                                                self.colors['accent'], self.colors['success']])
        axes[0, 2].set_title('Kunden-Segment Verteilung')
        
        # 4. Umsatz nach Segment
        segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
        bars = axes[1, 0].bar(segment_revenue.index, segment_revenue.values, 
                              color=[self.colors['primary'], self.colors['secondary'], 
                                     self.colors['accent'], self.colors['success']])
        axes[1, 0].set_title('Durchschnittlicher Umsatz nach Segment')
        axes[1, 0].set_xlabel('Segment')
        axes[1, 0].set_ylabel('Ø Umsatz (€)')
        
        # Werte auf Bars anzeigen
        for bar, value in zip(bars, segment_revenue.values):
            axes[1, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 100,
                           f'€{value:.0f}', ha='center', va='bottom')
        
        # 5. Alter vs. Gesamtausgaben
        scatter = axes[1, 1].scatter(customer_data['age'], customer_data['total_spent'], 
                                   alpha=0.6, c=customer_data['orders'], cmap='viridis')
        axes[1, 1].set_title('Alter vs. Gesamtausgaben')
        axes[1, 1].set_xlabel('Alter')
        axes[1, 1].set_ylabel('Gesamtausgaben (€)')
        plt.colorbar(scatter, ax=axes[1, 1], label='Anzahl Bestellungen')
        
        # 6. Boxplot für Umsatz nach Segment
        segments = customer_data['segment'].unique()
        data_for_boxplot = [customer_data[customer_data['segment'] == seg]['total_spent'] 
                           for seg in segments]
        
        box_plot = axes[1, 2].boxplot(data_for_boxplot, labels=segments, patch_artist=True)
        colors = [self.colors['primary'], self.colors['secondary'], 
                 self.colors['accent'], self.colors['success']]
        
        for patch, color in zip(box_plot['boxes'], colors):
            patch.set_facecolor(color)
            patch.set_alpha(0.7)
        
        axes[1, 2].set_title('Umsatz-Verteilung nach Segment')
        axes[1, 2].set_xlabel('Segment')
        axes[1, 2].set_ylabel('Gesamtausgaben (€)')
        
        plt.tight_layout()
        plt.savefig('customer_analytics.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_product_analytics_visualizations(self, product_data):
        """Produkt-Analytics Visualisierungen erstellen"""
        print("Erstelle Produkt-Analytics Visualisierungen...")
        
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Produkt-Analytics Dashboard', fontsize=16, fontweight='bold')
        
        # 1. Top-Produkte nach Umsatz
        product_revenue = product_data.groupby('product').apply(
            lambda x: (x['price'] * x['quantity_sold']).sum()
        ).sort_values(ascending=False)
        
        bars = axes[0, 0].barh(product_revenue.index, product_revenue.values, 
                              color=self.colors['primary'])
        axes[0, 0].set_title('Top-Produkte nach Gesamtumsatz')
        axes[0, 0].set_xlabel('Gesamtumsatz (€)')
        
        # Werte auf Bars anzeigen
        for i, (bar, value) in enumerate(zip(bars, product_revenue.values)):
            axes[0, 0].text(value + max(product_revenue.values) * 0.01, i,
                           f'€{value:,.0f}', va='center')
        
        # 2. Preisverteilung nach Kategorie
        categories = product_data['category'].unique()
        for i, category in enumerate(categories):
            category_data = product_data[product_data['category'] == category]['price']
            axes[0, 1].hist(category_data, bins=20, alpha=0.7, label=category,
                           color=self.colors['primary'] if i == 0 else self.colors['secondary'])
        
        axes[0, 1].set_title('Preisverteilung nach Kategorie')
        axes[0, 1].set_xlabel('Preis (€)')
        axes[0, 1].set_ylabel('Häufigkeit')
        axes[0, 1].legend()
        
        # 3. Bewertung vs. Preis
        axes[1, 0].scatter(product_data['price'], product_data['rating'], 
                          alpha=0.6, color=self.colors['accent'])
        axes[1, 0].set_title('Bewertung vs. Preis')
        axes[1, 0].set_xlabel('Preis (€)')
        axes[1, 0].set_ylabel('Bewertung')
        
        # Korrelationskoeffizient
        correlation = np.corrcoef(product_data['price'], product_data['rating'])[0, 1]
        axes[1, 0].text(0.05, 0.95, f'Korrelation: {correlation:.3f}', 
                       transform=axes[1, 0].transAxes, va='top',
                       bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
        
        # 4. Monatliche Verkäufe
        monthly_sales = product_data.groupby('month')['quantity_sold'].sum()
        month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
                      'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        
        axes[1, 1].bar(monthly_sales.index, monthly_sales.values, 
                      color=self.colors['info'], alpha=0.7)
        axes[1, 1].set_title('Monatliche Verkaufsmengen')
        axes[1, 1].set_xlabel('Monat')
        axes[1, 1].set_ylabel('Verkaufsmenge')
        axes[1, 1].set_xticks(range(1, 13))
        axes[1, 1].set_xticklabels(month_names, rotation=45)
        
        plt.tight_layout()
        plt.savefig('product_analytics.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_geographic_visualizations(self, geo_data):
        """Geografische Visualisierungen erstellen"""
        print("Erstelle geografische Visualisierungen...")
        
        fig, axes = plt.subplots(2, 2, figsize=(16, 12))
        fig.suptitle('Geografische Analyse Dashboard', fontsize=16, fontweight='bold')
        
        # 1. Umsatz nach Region
        region_revenue = geo_data.groupby('region')['revenue'].sum().sort_values(ascending=False)
        bars = axes[0, 0].bar(region_revenue.index, region_revenue.values, 
                              color=[self.colors['primary'], self.colors['secondary'], 
                                     self.colors['accent'], self.colors['success'], self.colors['info'])
        axes[0, 0].set_title('Gesamtumsatz nach Region')
        axes[0, 0].set_ylabel('Umsatz (€)')
        
        # Werte auf Bars anzeigen
        for bar, value in zip(bars, region_revenue.values):
            axes[0, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_revenue.values) * 0.01,
                           f'€{value:,.0f}', ha='center', va='bottom')
        
        # 2. Bevölkerung vs. Umsatz
        axes[0, 1].scatter(geo_data['population'], geo_data['revenue'], 
                          alpha=0.6, s=geo_data['stores']*10, 
                          c=self.colors['primary'])
        axes[0, 1].set_title('Bevölkerung vs. Umsatz')
        axes[0, 1].set_xlabel('Bevölkerung')
        axes[0, 1].set_ylabel('Umsatz (€)')
        
        # 3. Store-Dichte nach Region
        region_stores = geo_data.groupby('region')['stores'].sum()
        axes[1, 0].pie(region_stores.values, labels=region_stores.index, 
                      autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'], 
                                                self.colors['accent'], self.colors['success'], self.colors['info']])
        axes[1, 0].set_title('Store-Verteilung nach Region')
        
        # 4. Umsatz pro Store
        region_avg_revenue = geo_data.groupby('region').apply(
            lambda x: x['revenue'].sum() / x['stores'].sum()
        ).sort_values(ascending=False)
        
        bars = axes[1, 1].bar(region_avg_revenue.index, region_avg_revenue.values, 
                              color=self.colors['secondary'])
        axes[1, 1].set_title('Durchschnittlicher Umsatz pro Store')
        axes[1, 1].set_ylabel('Ø Umsatz pro Store (€)')
        
        # Werte auf Bars anzeigen
        for bar, value in zip(bars, region_avg_revenue.values):
            axes[1, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_avg_revenue.values) * 0.01,
                           f'€{value:,.0f}', ha='center', va='bottom')
        
        plt.tight_layout()
        plt.savefig('geographic_analytics.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig
    
    def create_interactive_visualizations(self, revenue_data, customer_data, product_data):
        """Interaktive Visualisierungen mit Plotly erstellen"""
        print("Erstelle interaktive Visualisierungen...")
        
        # 1. Interaktive Zeitreihe
        fig1 = make_subplots(
            rows=2, cols=1,
            subplot_titles=('Täglicher Umsatz', 'Monatlicher Umsatz'),
            vertical_spacing=0.1
        )
        
        # Täglicher Umsatz mit Moving Averages
        fig1.add_trace(
            go.Scatter(
                x=revenue_data['date'],
                y=revenue_data['revenue'],
                mode='lines',
                name='Täglicher Umsatz',
                line=dict(color='blue', width=1)
            ),
            row=1, col=1
        )
        
        fig1.add_trace(
            go.Scatter(
                x=revenue_data['date'],
                y=revenue_data['ma_7'],
                mode='lines',
                name='7-Tage MA',
                line=dict(color='orange', width=2)
            ),
            row=1, col=1
        )
        
        # Monatlicher Umsatz
        monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
        fig1.add_trace(
            go.Bar(
                x=monthly_revenue.index,
                y=monthly_revenue.values,
                name='Monatlicher Umsatz',
                marker_color='lightblue'
            ),
            row=2, col=1
        )
        
        fig1.update_layout(
            title='Interaktive Umsatz-Analyse',
            height=600,
            showlegend=True
        )
        
        fig1.show()
        
        # 2. Interaktive Kunden-Analyse
        fig2 = make_subplots(
            rows=2, cols=2,
            subplot_titles=('Altersverteilung', 'Einkommensverteilung', 
                          'Umsatz nach Segment', 'Alter vs. Umsatz'),
            specs=[[{"type": "histogram"}, {"type": "histogram"}],
                   [{"type": "bar"}, {"type": "scatter"}]]
        )
        
        # Altersverteilung
        fig2.add_trace(
            go.Histogram(
                x=customer_data['age'],
                nbinsx=20,
                name='Alter',
                marker_color='lightgreen'
            ),
            row=1, col=1
        )
        
        # Einkommensverteilung
        fig2.add_trace(
            go.Histogram(
                x=customer_data['income'],
                nbinsx=30,
                name='Einkommen',
                marker_color='lightcoral'
            ),
            row=1, col=2
        )
        
        # Umsatz nach Segment
        segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
        fig2.add_trace(
            go.Bar(
                x=segment_revenue.index,
                y=segment_revenue.values,
                name='Ø Umsatz',
                marker_color='lightblue'
            ),
            row=2, col=1
        )
        
        # Alter vs. Umsatz
        fig2.add_trace(
            go.Scatter(
                x=customer_data['age'],
                y=customer_data['total_spent'],
                mode='markers',
                name='Kunden',
                marker=dict(
                    size=8,
                    color=customer_data['orders'],
                    colorscale='Viridis',
                    showscale=True,
                    colorbar=dict(title="Bestellungen")
                )
            ),
            row=2, col=2
        )
        
        fig2.update_layout(
            title='Interaktive Kunden-Analyse',
            height=600,
            showlegend=False
        )
        
        fig2.show()
        
        # 3. Interaktive 3D-Visualisierung
        fig3 = go.Figure(data=[go.Scatter3d(
            x=customer_data['age'],
            y=customer_data['income'],
            z=customer_data['total_spent'],
            mode='markers',
            marker=dict(
                size=5,
                color=customer_data['orders'],
                colorscale='Viridis',
                showscale=True,
                colorbar=dict(title="Bestellungen")
            ),
            text=[f"Kunde {i}<br>Alter: {age}<br>Einkommen: {inc:,.0f}€<br>Umsatz: {spent:,.0f}€" 
                   for i, (age, inc, spent) in enumerate(zip(customer_data['age'], 
                                                          customer_data['income'], 
                                                          customer_data['total_spent']))],
            hovertemplate='%{text}<extra></extra>'
        )])
        
        fig3.update_layout(
            title='3D Kunden-Analyse (Alter, Einkommen, Umsatz)',
            scene=dict(
                xaxis_title='Alter',
                yaxis_title='Einkommen (€)',
                zaxis_title='Umsatz (€)'
            ),
            height=600
        )
        
        fig3.show()
        
        return fig1, fig2, fig3
    
    def create_dashboard_summary(self, revenue_data, customer_data, product_data, geo_data):
        """Zusammenfassendes Dashboard erstellen"""
        print("Erstelle zusammenfassendes Dashboard...")
        
        # Berechne Key Metrics
        total_revenue = revenue_data['revenue'].sum()
        avg_daily_revenue = revenue_data['revenue'].mean()
        total_customers = len(customer_data)
        avg_customer_value = customer_data['total_spent'].mean()
        total_orders = customer_data['orders'].sum()
        top_product = product_data.groupby('product').apply(
            lambda x: (x['price'] * x['quantity_sold']).sum()
        ).idxmax()
        
        # Dashboard erstellen
        fig = plt.figure(figsize=(20, 16))
        
        # Titel
        fig.suptitle('Data Science Analytics Dashboard', fontsize=20, fontweight='bold')
        
        # Key Metrics
        metrics_text = f"""
        KEY METRICS
        ─────────────────────
        Gesamtumsatz: €{total_revenue:,.0f}
        Ø Tagesumsatz: €{avg_daily_revenue:,.0f}
        Gesamt-Kunden: {total_customers:,}
        Ø Kundenwert: €{avg_customer_value:,.0f}
        Gesamt-Bestellungen: {total_orders:,}
        Top-Produkt: {top_product}
        """
        
        plt.figtext(0.02, 0.95, metrics_text, fontsize=12, fontfamily='monospace',
                   verticalalignment='top', bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
        
        # Subplots für verschiedene Analysen
        gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3, 
                           left=0.15, right=0.95, top=0.85, bottom=0.05)
        
        # 1. Umsatz-Trend (oben links)
        ax1 = fig.add_subplot(gs[0, 0])
        monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
        ax1.plot(monthly_revenue.index, monthly_revenue.values, 
                color=self.colors['primary'], linewidth=2)
        ax1.set_title('Monatlicher Umsatz-Trend')
        ax1.tick_params(axis='x', rotation=45)
        
        # 2. Kunden-Segmente (oben mitte)
        ax2 = fig.add_subplot(gs[0, 1])
        segment_counts = customer_data['segment'].value_counts()
        ax2.pie(segment_counts.values, labels=segment_counts.index, autopct='%1.1f%%',
               colors=[self.colors['primary'], self.colors['secondary'], 
                      self.colors['accent'], self.colors['success']])
        ax2.set_title('Kunden-Segmente')
        
        # 3. Top-Produkte (oben rechts)
        ax3 = fig.add_subplot(gs[0, 2])
        product_revenue = product_data.groupby('product').apply(
            lambda x: (x['price'] * x['quantity_sold']).sum()
        ).sort_values(ascending=False).head(5)
        
        bars = ax3.barh(range(len(product_revenue)), product_revenue.values, 
                       color=self.colors['info'])
        ax3.set_yticks(range(len(product_revenue)))
        ax3.set_yticklabels(product_revenue.index)
        ax3.set_title('Top 5 Produkte')
        ax3.set_xlabel('Umsatz (€)')
        
        # 4. Altersverteilung (mitte links)
        ax4 = fig.add_subplot(gs[1, 0])
        ax4.hist(customer_data['age'], bins=15, color=self.colors['secondary'], alpha=0.7)
        ax4.set_title('Altersverteilung')
        ax4.set_xlabel('Alter')
        
        # 5. Regionale Verteilung (mitte mitte)
        ax5 = fig.add_subplot(gs[1, 1])
        region_revenue = geo_data.groupby('region')['revenue'].sum()
        ax5.bar(region_revenue.index, region_revenue.values, 
               color=[self.colors['primary'], self.colors['secondary'], 
                      self.colors['accent'], self.colors['success'], self.colors['info']])
        ax5.set_title('Umsatz nach Region')
        ax5.set_ylabel('Umsatz (€)')
        
        # 6. Bewertung vs. Preis (mitte rechts)
        ax6 = fig.add_subplot(gs[1, 2])
        ax6.scatter(product_data['price'], product_data['rating'], 
                  alpha=0.6, color=self.colors['accent'])
        ax6.set_title('Bewertung vs. Preis')
        ax6.set_xlabel('Preis (€)')
        ax6.set_ylabel('Bewertung')
        
        # 7. Umsatz-Verteilung (unten links)
        ax7 = fig.add_subplot(gs[2, :2])
        ax7.hist(revenue_data['revenue'], bins=30, color=self.colors['primary'], 
                alpha=0.7, edgecolor='black')
        ax7.set_title('Tägliche Umsatz-Verteilung')
        ax7.set_xlabel('Umsatz (€)')
        ax7.set_ylabel('Häufigkeit')
        ax7.axvline(revenue_data['revenue'].mean(), color=self.colors['accent'], 
                   linestyle='--', linewidth=2, label=f'Mittelwert: €{revenue_data["revenue"].mean():.0f}')
        ax7.legend()
        
        # 8. Korrelations-Heatmap (unten rechts)
        ax8 = fig.add_subplot(gs[2, 2])
        
        # Korrelationsmatrix erstellen
        corr_data = customer_data[['age', 'income', 'total_spent', 'orders']].corr()
        im = ax8.imshow(corr_data, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
        
        # Farblegende
        cbar = plt.colorbar(im, ax=ax8)
        cbar.set_label('Korrelation')
        
        # Labels und Werte
        ax8.set_xticks(range(len(corr_data.columns)))
        ax8.set_yticks(range(len(corr_data.columns)))
        ax8.set_xticklabels(corr_data.columns, rotation=45)
        ax8.set_yticklabels(corr_data.columns)
        
        # Werte in die Zellen schreiben
        for i in range(len(corr_data.columns)):
            for j in range(len(corr_data.columns)):
                text = ax8.text(j, i, f'{corr_data.iloc[i, j]:.2f}',
                               ha="center", va="center", color="black")
        
        ax8.set_title('Korrelationsmatrix')
        
        plt.savefig('comprehensive_dashboard.png', dpi=300, bbox_inches='tight')
        plt.show()
        
        return fig

# Demo ausführen
def data_visualization_demo():
    viz = DataVisualizationDemo()
    
    try:
        # Beispieldaten erstellen
        revenue_data, customer_data, product_data, geo_data = viz.create_sample_data()
        
        # Verschiedene Visualisierungen erstellen
        viz.create_time_series_visualizations(revenue_data)
        viz.create_customer_analytics_visualizations(customer_data)
        viz.create_product_analytics_visualizations(product_data)
        viz.create_geographic_visualizations(geo_data)
        
        # Interaktive Visualisierungen
        viz.create_interactive_visualizations(revenue_data, customer_data, product_data)
        
        # Zusammenfassendes Dashboard
        viz.create_dashboard_summary(revenue_data, customer_data, product_data, geo_data)
        
        print("Data Visualization Demo erfolgreich abgeschlossen!")
        
    except Exception as e:
        print(f"Fehler in Data Visualization Demo: {e}")

if __name__ == "__main__":
    data_visualization_demo()

Data Science Workflow

Typischer Data Science Process

graph TD
    A[Problem Definition] --> B[Data Collection]
    B --> C[Data Cleaning]
    C --> D[Exploratory Analysis]
    D --> E[Feature Engineering]
    E --> F[Model Building]
    F --> G[Model Evaluation]
    G --> H[Deployment]
    H --> I[Monitoring]
    I --> A

CRISP-DM Methodology

  1. Business Understanding: Geschäftsziele verstehen
  2. Data Understanding: Daten explorieren
  3. Data Preparation: Daten bereinigen und vorbereiten
  4. Modeling: Modelle entwickeln
  5. Evaluation: Modelle bewerten
  6. Deployment: Modelle部署
  7. Monitoring: Performance überwachen

Big Data Technologies

Hadoop Ecosystem

KomponenteFunktionAlternativen
HDFSDistributed StorageAmazon S3, Google Cloud Storage
MapReduceBatch ProcessingApache Spark, Apache Flink
HiveSQL on HadoopPresto, Apache Impala
HBaseNoSQL DatabaseCassandra, MongoDB
ZookeeperCoordinationConsul, etcd

Spark vs. MapReduce

FeatureSparkMapReduce
SpeedIn-MemoryDisk-based
Ease of UseHigh APIsLow-level
StreamingNativeBatch only
ML SupportMLlibExternal
Resource UsageHigherLower

Machine Learning Pipeline Components

Data Pipeline Stages

# ML Pipeline Beispiel mit Scikit-learn
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

# Preprocessing
numeric_features = ['age', 'income', 'orders']
categorical_features = ['segment', 'region']

numeric_transformer = Pipeline(steps=[
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

# Complete Pipeline
ml_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', RandomForestRegressor(n_estimators=100))
])

MLOps Practices

  • Version Control: Git für Code, DVC für Daten
  • Experiment Tracking: MLflow, Weights & Biases
  • Model Registry: MLflow Model Registry
  • CI/CD: GitHub Actions, Jenkins
  • Monitoring: Prometheus, Grafana
  • Drift Detection: Evidently AI, NannyML

Data Visualization Principles

Chart Selection Guide

Daten-TypVisualisierungZweck
ZeitreiheLine ChartTrends über Zeit
KategorischBar ChartVergleiche
VerteilungHistogramHäufigkeiten
BeziehungScatter PlotKorrelationen
KompositionPie ChartAnteile
GeografischMapStandorte

Color Theory für Data Viz

  • Primärfarben: Blau, Rot, Gelb
  • Sekundärfarben: Grün, Orange, Lila
  • Farbpsychologie: Blau=Vertrauen, Rot=Gefahr, Grün=Erfolg
  • Kontraste: Hell/Dunkel für Lesbarkeit
  • Farbenblindheit: 8-10% der Bevölkerung

ETL Best Practices

Data Quality Checks

def validate_data_quality(df):
    """Datenqualität validieren"""
    quality_report = {
        'total_rows': len(df),
        'missing_values': df.isnull().sum().to_dict(),
        'duplicate_rows': df.duplicated().sum(),
        'data_types': df.dtypes.to_dict(),
        'outliers': detect_outliers(df)
    }
    return quality_report

def detect_outliers(df, column):
    """Outliers mit IQR-Methode erkennen"""
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
    return len(outliers)

Performance Optimization

  • Parallel Processing: Multiprocessing, Dask
  • Memory Management: Chunking, Generators
  • Indexing: Database Indexes
  • Caching: Redis, Memcached
  • Batch Processing: Bulk Operations

Analytics Techniques

Statistical Methods

  • Descriptive Statistics: Mean, Median, Mode, Std Dev
  • Inferential Statistics: Hypothesis Testing, Confidence Intervals
  • Correlation Analysis: Pearson, Spearman, Kendall
  • Regression Analysis: Linear, Logistic, Polynomial
  • Time Series: ARIMA, Prophet, LSTM

Advanced Analytics

  • Clustering: K-Means, Hierarchical, DBSCAN
  • Classification: Decision Trees, Random Forest, SVM
  • Anomaly Detection: Isolation Forest, One-Class SVM
  • Dimensionality Reduction: PCA, t-SNE, UMAP
  • Association Rules: Apriori, FP-Growth

Vorteile und Nachteile

Vorteile von Data Science

  • Data-Driven Decisions: Bessere Geschäftsentscheidungen
  • Pattern Recognition: Verborgene Muster entdecken
  • Predictive Analytics: Zukünftige Trends vorhersagen
  • Process Optimization: Effizienz steigern
  • Competitive Advantage: Wettbewerbsvorteile

Nachteile

  • Data Quality: Abhängig von Datenqualität
  • Complexity: Komplexe Algorithmen und Tools
  • Privacy Concerns: Datenschutz und Ethik
  • Resource Intensive: Rechenleistung und Speicher
  • Interpretability: Black-Box-Modelle

Häufige Prüfungsfragen

  1. Was ist der Unterschied zwischen Big Data und traditionellen Daten? Big Data zeichnet sich durch die 3V (Volume, Velocity, Variety) aus und erfordert spezielle Technologien zur Verarbeitung.

  2. Erklären Sie den ETL-Prozess! ETL steht für Extract (Daten extrahieren), Transform (Daten bereinigen und umformen) und Load (Daten in Zielsystem laden).

  3. Wann verwendet man welche Chart-Art? Zeitreihen für Trends, Bar Charts für Vergleiche, Histogramme für Verteilungen, Scatter Plots für Beziehungen.

  4. Was ist der Zweck von Machine Learning Pipelines? ML-Pipelines automatisieren und standardisieren den gesamten Workflow von Datenaufbereitung bis zum Model-Deployment.

Wichtigste Quellen

  1. https://spark.apache.org/
  2. https://airflow.apache.org/
  3. https://matplotlib.org/
  4. https://plotly.com/python/
Zurück zum Blog
Share: