Data Science Grundlagen: Big Data Analytics, ML-Pipelines, ETL & Visualization
Dieser Beitrag ist eine umfassende Einführung in die Data Science Grundlagen – inklusive Big Data Analytics, Machine Learning Pipelines, ETL-Prozessen und Data Visualization mit praktischen Beispielen.
In a Nutshell
Data Science kombiniert Statistik, Programmierung und Domänenwissen um aus Daten Erkenntnisse zu gewinnen. Big Data verarbeitet große Datenmengen, ML-Pipelines automatisieren Modelle, ETL bereitet Daten auf, Visualization macht Ergebnisse sichtbar.
Kompakte Fachbeschreibung
Data Science ist ein interdisziplinäres Feld, das wissenschaftliche Methoden, Prozesse, Algorithmen und Systeme verwendet um Wissen und Erkenntnisse aus strukturierten und unstrukturierten Daten zu extrahieren.
Kernbereiche:
Big Data Analytics
- Konzept: Verarbeitung extrem großer Datenmengen
- 3V-Modell: Volume, Velocity, Variety
- Technologien: Hadoop, Spark, NoSQL-Datenbanken
- Anwendungen: Echtzeitanalyse, Predictive Analytics
Machine Learning Pipelines
- Konzept: Automatisierte ML-Workflow-Orchestrierung
- Phasen: Data Collection → Preprocessing → Training → Evaluation → Deployment
- Tools: Scikit-learn, TensorFlow, MLflow, Airflow
- MLOps: Versionierung, Monitoring, Retraining
ETL-Prozesse
- Extract: Daten aus verschiedenen Quellen extrahieren
- Transform: Daten bereinigen und transformieren
- Load: Daten in Zielsysteme laden
- Tools: Apache NiFi, Talend, AWS Glue
Data Visualization
- Konzept: Visuelle Darstellung von Daten und Erkenntnissen
- Chart-Typen: Bar, Line, Scatter, Heatmap, Treemap
- Tools: Matplotlib, Seaborn, Plotly, Tableau
- Prinzipien: Klarheit, Genauigkeit, Effizienz
Prüfungsrelevante Stichpunkte
- Data Science: Interdisziplinäres Feld für Datenanalyse
- Big Data: Große, schnelle, vielfältige Datenmengen
- Machine Learning: Automatisierte Mustererkennung in Daten
- ETL: Extract-Transform-Load für Datenaufbereitung
- Visualization: Visuelle Datenrepräsentation
- Pipelines: Automatisierte Datenverarbeitungsprozesse
- Analytics: Statistische und explorative Datenanalyse
- IHK-relevant: Moderne Datenanalyse und Business Intelligence
Kernkomponenten
- Data Collection: Datenquellen und -sammlung
- Data Storage: Speicherung und Organisation
- Data Processing: Bereinigung und Transformation
- Data Analysis: Statistische und explorative Analyse
- Machine Learning: Modelle und Algorithmen
- Data Visualization: Visuelle Darstellung
- Model Deployment: Produktionseinsatz
- Monitoring: Performance und Drift-Erkennung
Praxisbeispiele
1. Big Data Processing mit Apache Spark und Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import seaborn as sns
# Big Data Processing Demo
class BigDataAnalytics:
def __init__(self, app_name="BigDataAnalytics"):
"""Spark Session initialisieren"""
self.spark = SparkSession.builder \
.appName(app_name) \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
.getOrCreate()
print(f"Spark Session erstellt: {self.spark.version}")
def create_sample_data(self):
"""Beispieldaten für Big Data Demo erstellen"""
import random
from datetime import datetime, timedelta
# Simulierte Verkaufsdaten
data = []
products = ["Laptop", "Smartphone", "Tablet", "Headphones", "Mouse", "Keyboard"]
regions = ["North", "South", "East", "West", "Central"]
start_date = datetime(2023, 1, 1)
for i in range(1000000): # 1M Datensätze
date = start_date + timedelta(days=random.randint(0, 365))
record = {
"transaction_id": f"T{i:06d}",
"date": date.strftime("%Y-%m-%d"),
"product": random.choice(products),
"region": random.choice(regions),
"quantity": random.randint(1, 10),
"unit_price": round(random.uniform(10, 1000), 2),
"customer_id": f"C{random.randint(1, 50000):05d}",
"sales_rep": f"SR{random.randint(1, 100):03d}"
}
record["total_amount"] = record["quantity"] * record["unit_price"]
data.append(record)
# Spark DataFrame erstellen
schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("date", StringType(), True),
StructField("product", StringType(), True),
StructField("region", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("unit_price", DoubleType(), True),
StructField("customer_id", StringType(), True),
StructField("sales_rep", StringType(), True),
StructField("total_amount", DoubleType(), True)
])
df = self.spark.createDataFrame(data, schema)
print(f"Beispieldaten erstellt: {df.count()} Zeilen")
return df
def basic_analytics(self, df):
"""Grundlegende Analytics durchführen"""
print("=== Grundlegende Analytics ===")
# Schema anzeigen
print("Schema:")
df.printSchema()
# Statistische Zusammenfassung
print("\nStatistische Zusammenfassung:")
df.describe().show()
# Top-Produkte nach Umsatz
print("\nTop-Produkte nach Umsatz:")
product_sales = df.groupBy("product") \
.agg(sum("total_amount").alias("total_sales"),
count("transaction_id").alias("transaction_count")) \
.orderBy(col("total_sales").desc())
product_sales.show()
# Regionale Analyse
print("\nRegionale Umsatzanalyse:")
regional_sales = df.groupBy("region") \
.agg(sum("total_amount").alias("total_sales"),
avg("total_amount").alias("avg_transaction"),
count("transaction_id").alias("transaction_count")) \
.orderBy(col("total_sales").desc())
regional_sales.show()
# Zeitliche Analyse
print("\nMonatliche Umsatzentwicklung:")
monthly_sales = df.withColumn("month", substring(col("date"), 1, 7)) \
.groupBy("month") \
.agg(sum("total_amount").alias("monthly_sales"),
count("transaction_id").alias("transaction_count")) \
.orderBy("month")
monthly_sales.show()
return product_sales, regional_sales, monthly_sales
def advanced_analytics(self, df):
"""Fortgeschrittene Analytics mit Window Functions"""
print("\n=== Fortgeschrittene Analytics ===")
# Window Functions für laufende Summen
from pyspark.sql.window import Window
# Monatliche kumulative Umsätze
window_spec = Window.partitionBy("month").orderBy("date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
monthly_cumulative = df.withColumn("month", substring(col("date"), 1, 7)) \
.groupBy("month", "date") \
.agg(sum("total_amount").alias("daily_sales")) \
.withColumn("cumulative_sales", sum("daily_sales").over(window_spec)) \
.orderBy("month", "date")
print("Monatliche kumulative Umsätze:")
monthly_cumulative.show(20)
# Best-Performing Sales Reps
rep_performance = df.groupBy("sales_rep") \
.agg(sum("total_amount").alias("total_sales"),
count("transaction_id").alias("transaction_count"),
avg("total_amount").alias("avg_transaction")) \
.withColumn("performance_rank", percent_rank().over(Window.orderBy(col("total_sales").desc()))) \
.filter(col("performance_rank") <= 0.1) # Top 10%
print("\nTop 10% Sales Reps:")
rep_performance.show()
# Produkt-Korrelationen
product_correlation = df.groupBy("customer_id", "product") \
.agg(sum("quantity").alias("product_quantity")) \
.groupBy("customer_id") \
.pivot("product") \
.agg(sum("product_quantity")) \
.fillna(0)
print("\nProdukt-Korrelationsmatrix (Top 10 Kunden):")
product_correlation.limit(10).show()
return monthly_cumulative, rep_performance, product_correlation
def machine_learning_pipeline(self, df):
"""Machine Learning Pipeline für Umsatzvorhersage"""
print("\n=== Machine Learning Pipeline ===")
# Features für ML vorbereiten
feature_data = df.groupBy("product", "region", "month") \
.agg(sum("total_amount").alias("target_sales"),
count("transaction_id").alias("transaction_count"),
avg("unit_price").alias("avg_unit_price"),
sum("quantity").alias("total_quantity"))
# Kategoriale Features encodieren
from pyspark.ml.feature import StringIndexer, OneHotEncoder
# Product Indexer
product_indexer = StringIndexer(inputCol="product", outputCol="product_index")
product_indexed = product_indexer.fit(feature_data).transform(feature_data)
# Region Indexer
region_indexer = StringIndexer(inputCol="region", outputCol="region_index")
region_indexed = region_indexer.fit(product_indexed).transform(product_indexed)
# One-Hot Encoding
encoder = OneHotEncoder(inputCols=["product_index", "region_index"],
outputCols=["product_encoded", "region_encoded"])
encoded_data = encoder.fit(region_indexed).transform(region_indexed)
# Feature Vector erstellen
assembler = VectorAssembler(
inputCols=["transaction_count", "avg_unit_price", "total_quantity",
"product_encoded", "region_encoded"],
outputCol="features"
)
assembled_data = assembler.transform(encoded_data)
# Features skalieren
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_data)
scaled_data = scaler_model.transform(assembled_data)
# Train-Test Split
train_data, test_data = scaled_data.randomSplit([0.8, 0.2], seed=42)
print(f"Trainingsdaten: {train_data.count()} Zeilen")
print(f"Testdaten: {test_data.count()} Zeilen")
# Lineare Regression
lr = LinearRegression(featuresCol="scaled_features",
labelCol="target_sales",
predictionCol="predicted_sales")
lr_model = lr.fit(train_data)
# Vorhersagen auf Testdaten
predictions = lr_model.transform(test_data)
# Modell evaluieren
evaluator = RegressionEvaluator(labelCol="target_sales",
predictionCol="predicted_sales",
metricName="rmse")
rmse = evaluator.evaluate(predictions)
r2 = evaluator.setMetricName("r2").evaluate(predictions)
print(f"Modell-Evaluation:")
print(f"RMSE: {rmse:.2f}")
print(f"R²: {r2:.4f}")
# Feature Importance anzeigen
print(f"\nFeature Coefficients:")
feature_names = ["transaction_count", "avg_unit_price", "total_quantity",
"product_encoded", "region_encoded"]
for i, (name, coef) in enumerate(zip(feature_names, lr_model.coefficients)):
print(f"{name}: {coef:.4f}")
return lr_model, predictions, scaler_model
def real_time_processing_simulation(self, df):
"""Echtzeitverarbeitung simulieren"""
print("\n=== Echtzeitverarbeitung Simulation ===")
# Streaming DataFrame simulieren
# In der Praxis würde dies von Kafka, Kinesis etc. kommen
# Aggregationen für Echtzeit-Monitoring
real_time_metrics = df.groupBy("product", "region") \
.agg(count("transaction_id").alias("transaction_count"),
sum("total_amount").alias("total_sales"),
avg("total_amount").alias("avg_transaction")) \
.orderBy(col("total_sales").desc())
print("Echtzeit-Metriken:")
real_time_metrics.show()
# Anomalie-Erkennung (vereinfacht)
from pyspark.sql.functions import stddev
# Statistische Grenzen für Anomalie-Erkennung
stats = df.groupBy("product") \
.agg(avg("total_amount").alias("avg_amount"),
stddev("total_amount").alias("std_amount")) \
.withColumn("upper_bound", col("avg_amount") + 2 * col("std_amount")) \
.withColumn("lower_bound", col("avg_amount") - 2 * col("std_amount"))
print("Anomalie-Grenzen pro Produkt:")
stats.show()
return real_time_metrics, stats
def data_export(self, df, predictions):
"""Daten für weitere Verarbeitung exportieren"""
print("\n=== Daten Export ===")
# Ergebnisse in verschiedene Formate exportieren
output_path = "/tmp/big_data_results"
# Analytics Ergebnisse
product_sales = df.groupBy("product") \
.agg(sum("total_amount").alias("total_sales")) \
.orderBy(col("total_sales").desc())
product_sales.write.mode("overwrite").parquet(f"{output_path}/product_sales")
print(f"Produkt-Umsätze exportiert nach: {output_path}/product_sales")
# ML-Vorhersagen
predictions.select("product", "region", "target_sales", "predicted_sales") \
.write.mode("overwrite").parquet(f"{output_path}/predictions")
print(f"ML-Vorhersagen exportiert nach: {output_path}/predictions")
# Zusammenfassende Statistiken
summary_stats = df.agg(
count("transaction_id").alias("total_transactions"),
sum("total_amount").alias("total_revenue"),
avg("total_amount").alias("avg_transaction_value"),
countDistinct("customer_id").alias("unique_customers"),
countDistinct("product").alias("unique_products")
)
summary_stats.write.mode("overwrite").json(f"{output_path}/summary_stats")
print(f"Zusammenfassende Statistiken exportiert nach: {output_path}/summary_stats")
def cleanup(self):
"""Ressourcen freigeben"""
self.spark.stop()
print("Spark Session beendet")
# Big Data Demo ausführen
def big_data_demo():
analytics = BigDataAnalytics()
try:
# Daten erstellen
df = analytics.create_sample_data()
# Grundlegende Analytics
product_sales, regional_sales, monthly_sales = analytics.basic_analytics(df)
# Fortgeschrittene Analytics
monthly_cumulative, rep_performance, product_correlation = analytics.advanced_analytics(df)
# Machine Learning Pipeline
lr_model, predictions, scaler_model = analytics.machine_learning_pipeline(df)
# Echtzeitverarbeitung
real_time_metrics, stats = analytics.real_time_processing_simulation(df)
# Daten exportieren
analytics.data_export(df, predictions)
print("\nBig Data Analytics Demo erfolgreich abgeschlossen!")
except Exception as e:
print(f"Fehler in Big Data Demo: {e}")
finally:
analytics.cleanup()
if __name__ == "__main__":
big_data_demo()
2. ETL Pipeline mit Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3UploadOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.models import Variable
from airflow.utils.dates import days_ago
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging
# ETL Pipeline Configuration
default_args = {
'owner': 'data-science-team',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG Definition
dag = DAG(
'data_science_etl_pipeline',
default_args=default_args,
description='ETL Pipeline für Data Science Analytics',
schedule_interval='@daily',
catchup=False,
tags=['data-science', 'etl', 'analytics'],
)
# Logging konfigurieren
logger = logging.getLogger(__name__)
def extract_customer_data(**context):
"""Kundendaten aus verschiedenen Quellen extrahieren"""
logger.info("Extrahiere Kundendaten...")
# PostgreSQL Hook
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
# Kundendaten aus PostgreSQL extrahieren
customer_query = """
SELECT
customer_id,
first_name,
last_name,
email,
phone,
registration_date,
last_login_date,
total_orders,
total_spent,
customer_segment
FROM customers
WHERE last_login_date >= DATE_SUB(CURRENT_DATE, INTERVAL 30 DAY)
"""
customers_df = postgres_hook.get_pandas_df(customer_query)
logger.info(f"{len(customers_df)} Kundendatensätze extrahiert")
# S3 Hook für zusätzliche Daten
s3_hook = S3Hook(aws_conn_id='aws_default')
# Produkt-Interaktionsdaten aus S3
bucket_name = Variable.get('data_bucket_name', default_var='analytics-data')
try:
# CSV-Datei aus S3 laden
file_content = s3_hook.read_key(key='customer_interactions.csv', bucket_name=bucket_name)
interactions_df = pd.read_csv(pd.StringIO(file_content))
logger.info(f"{len(interactions_df)} Interaktionsdatensätze aus S3 geladen")
except Exception as e:
logger.warning(f"Keine Interaktionsdaten in S3 gefunden: {e}")
interactions_df = pd.DataFrame()
# Daten kombinieren
if not interactions_df.empty:
combined_df = customers_df.merge(
interactions_df,
on='customer_id',
how='left'
)
else:
combined_df = customers_df
# Daten als JSON speichern für nächste Task
combined_json = combined_df.to_json(date_format='iso')
# In XCom speichern
task_instance = context['task_instance']
task_instance.xcom_push(key='customer_data', value=combined_json)
logger.info("Kundendaten-Extraktion abgeschlossen")
return combined_json
def transform_customer_data(**context):
"""Kundendaten transformieren und bereinigen"""
logger.info("Transformiere Kundendaten...")
# Daten aus vorheriger Task holen
task_instance = context['task_instance']
customer_json = task_instance.xcom_pull(task_ids='extract_customer_data', key='customer_data')
# JSON zu DataFrame
customers_df = pd.read_json(customer_json)
# Data Cleaning
logger.info("Führe Data Cleaning durch...")
# 1. Duplikate entfernen
original_count = len(customers_df)
customers_df = customers_df.drop_duplicates(subset=['customer_id'])
duplicates_removed = original_count - len(customers_df)
logger.info(f"{duplicates_removed} Duplikate entfernt")
# 2. Fehlende Werte behandeln
# Numerische Spalten: Median imputieren
numeric_columns = ['total_orders', 'total_spent']
for col in numeric_columns:
if col in customers_df.columns:
median_value = customers_df[col].median()
customers_df[col].fillna(median_value, inplace=True)
# Kategoriale Spalten: Mode imputieren
categorical_columns = ['customer_segment']
for col in categorical_columns:
if col in customers_df.columns:
mode_value = customers_df[col].mode()[0] if not customers_df[col].mode().empty else 'Unknown'
customers_df[col].fillna(mode_value, inplace=True)
# 3. Datums-Spalten normalisieren
date_columns = ['registration_date', 'last_login_date']
for col in date_columns:
if col in customers_df.columns:
customers_df[col] = pd.to_datetime(customers_df[col], errors='coerce')
# 4. Feature Engineering
logger.info("Erstelle neue Features...")
# Customer Lifetime Value (CLV) berechnen
if 'total_spent' in customers_df.columns and 'total_orders' in customers_df.columns:
customers_df['avg_order_value'] = customers_df['total_spent'] / customers_df['total_orders']
customers_df['clv_score'] = customers_df['total_spent'] * np.log1p(customers_df['total_orders'])
# Recency Score (Tage seit letztem Login)
if 'last_login_date' in customers_df.columns:
current_date = datetime.now()
customers_df['days_since_last_login'] = (current_date - customers_df['last_login_date']).dt.days
customers_df['recency_score'] = pd.cut(customers_df['days_since_last_login'],
bins=[0, 7, 30, 90, float('inf')],
labels=[4, 3, 2, 1])
# Engagement Score basierend auf verschiedenen Faktoren
engagement_features = []
if 'total_orders' in customers_df.columns:
engagement_features.append('total_orders')
if 'days_since_last_login' in customers_df.columns:
engagement_features.append('days_since_last_login')
if engagement_features:
# Normalisierte Features für Engagement Score
for feature in engagement_features:
if feature in customers_df.columns:
min_val = customers_df[feature].min()
max_val = customers_df[feature].max()
if max_val != min_val:
customers_df[f'{feature}_normalized'] = (customers_df[feature] - min_val) / (max_val - min_val)
else:
customers_df[f'{feature}_normalized'] = 0
# Engagement Score berechnen
normalized_features = [f'{f}_normalized' for f in engagement_features]
customers_df['engagement_score'] = customers_df[normalized_features].mean(axis=1)
# 5. Datenvalidierung
logger.info("Führe Datenvalidierung durch...")
validation_errors = []
# Business Rules prüfen
if 'total_orders' in customers_df.columns:
if (customers_df['total_orders'] < 0).any():
validation_errors.append("Negative Bestellanzahlen gefunden")
if 'total_spent' in customers_df.columns:
if (customers_df['total_spent'] < 0).any():
validation_errors.append("Negative Umsätze gefunden")
if 'email' in customers_df.columns:
invalid_emails = customers_df[~customers_df['email'].str.contains('@', na=False)]
if not invalid_emails.empty:
validation_errors.append(f"{len(invalid_emails)} ungültige E-Mail-Adressen gefunden")
if validation_errors:
logger.warning(f"Validierungsfehler: {validation_errors}")
# 6. Daten aggregieren für Analytics
logger.info("Aggregiere Daten für Analytics...")
# Kunden-Segment Statistiken
if 'customer_segment' in customers_df.columns:
segment_stats = customers_df.groupby('customer_segment').agg({
'customer_id': 'count',
'total_spent': ['sum', 'mean'],
'total_orders': ['sum', 'mean'],
'engagement_score': 'mean'
}).round(2)
logger.info("Kunden-Segment Statistiken erstellt")
# Transformierte Daten speichern
transformed_json = customers_df.to_json(date_format='iso')
task_instance.xcom_push(key='transformed_data', value=transformed_json)
logger.info(f"Datentransformation abgeschlossen: {len(customers_df)} Datensätze")
return transformed_json
def load_data_to_warehouse(**context):
"""Transformierte Daten in Data Warehouse laden"""
logger.info("Lade Daten in Data Warehouse...")
# Transformierte Daten holen
task_instance = context['task_instance']
transformed_json = task_instance.xcom_pull(task_ids='transform_customer_data', key='transformed_data')
# JSON zu DataFrame
customers_df = pd.read_json(transformed_json)
# PostgreSQL Hook
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
# Tabelle erstellen falls nicht existent
create_table_sql = """
CREATE TABLE IF NOT EXISTS analytics_customer_metrics (
customer_id VARCHAR(50) PRIMARY KEY,
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(255),
phone VARCHAR(50),
registration_date TIMESTAMP,
last_login_date TIMESTAMP,
total_orders INTEGER,
total_spent DECIMAL(10,2),
customer_segment VARCHAR(50),
avg_order_value DECIMAL(10,2),
clv_score DECIMAL(10,2),
days_since_last_login INTEGER,
recency_score INTEGER,
engagement_score DECIMAL(5,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
postgres_hook.run(create_table_sql)
# Upsert Operation (Insert oder Update)
upsert_sql = """
INSERT INTO analytics_customer_metrics (
customer_id, first_name, last_name, email, phone,
registration_date, last_login_date, total_orders, total_spent,
customer_segment, avg_order_value, clv_score, days_since_last_login,
recency_score, engagement_score
) VALUES (
%(customer_id)s, %(first_name)s, %(last_name)s, %(email)s, %(phone)s,
%(registration_date)s, %(last_login_date)s, %(total_orders)s, %(total_spent)s,
%(customer_segment)s, %(avg_order_value)s, %(clv_score)s, %(days_since_last_login)s,
%(recency_score)s, %(engagement_score)s
)
ON CONFLICT (customer_id)
DO UPDATE SET
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
email = EXCLUDED.email,
phone = EXCLUDED.phone,
registration_date = EXCLUDED.registration_date,
last_login_date = EXCLUDED.last_login_date,
total_orders = EXCLUDED.total_orders,
total_spent = EXCLUDED.total_spent,
customer_segment = EXCLUDED.customer_segment,
avg_order_value = EXCLUDED.avg_order_value,
clv_score = EXCLUDED.clv_score,
days_since_last_login = EXCLUDED.days_since_last_login,
recency_score = EXCLUDED.recency_score,
engagement_score = EXCLUDED.engagement_score,
updated_at = CURRENT_TIMESTAMP;
"""
# Daten in Batches laden (Performance-Optimierung)
batch_size = 1000
total_rows = len(customers_df)
for i in range(0, total_rows, batch_size):
batch_df = customers_df.iloc[i:i+batch_size]
# Daten für PostgreSQL vorbereiten
batch_records = batch_df.to_dict('records')
# NaN-Werte behandeln
for record in batch_records:
for key, value in record.items():
if pd.isna(value):
record[key] = None
elif isinstance(value, pd.Timestamp):
record[key] = value.isoformat()
# Batch ausführen
postgres_hook.run(upsert_sql, parameters=batch_records)
logger.info(f"Batch {i//batch_size + 1}/{(total_rows-1)//batch_size + 1} geladen: {len(batch_df)} Zeilen")
# BigQuery auch laden für Analytics
try:
bigquery_hook = BigQueryHook(gcp_conn_id='google_cloud_default', use_legacy_sql=False)
# Dataset und Table konfigurieren
project_id = Variable.get('gcp_project_id', default_var='my-project')
dataset_id = 'analytics'
table_id = 'customer_metrics'
# DataFrame zu BigQuery laden
customers_df.to_gbq(
destination_table=f'{dataset_id}.{table_id}',
project_id=project_id,
if_exists='replace',
progress_bar=False
)
logger.info("Daten erfolgreich nach BigQuery geladen")
except Exception as e:
logger.warning(f"BigQuery Load fehlgeschlagen: {e}")
logger.info("Daten-Ladevorgang abgeschlossen")
def generate_analytics_report(**context):
"""Analytics Report generieren"""
logger.info("Generiere Analytics Report...")
# PostgreSQL Hook für finale Daten
postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
# Analytics Queries ausführen
analytics_queries = {
'customer_segments': """
SELECT
customer_segment,
COUNT(*) as customer_count,
AVG(total_spent) as avg_total_spent,
AVG(total_orders) as avg_total_orders,
AVG(clv_score) as avg_clv_score,
AVG(engagement_score) as avg_engagement_score
FROM analytics_customer_metrics
GROUP BY customer_segment
ORDER BY avg_total_spent DESC
""",
'top_customers': """
SELECT
customer_id,
first_name,
last_name,
total_spent,
total_orders,
clv_score,
engagement_score
FROM analytics_customer_metrics
ORDER BY clv_score DESC
LIMIT 10
""",
'engagement_distribution': """
SELECT
CASE
WHEN engagement_score >= 0.8 THEN 'High'
WHEN engagement_score >= 0.6 THEN 'Medium'
WHEN engagement_score >= 0.4 THEN 'Low'
ELSE 'Very Low'
END as engagement_level,
COUNT(*) as customer_count,
AVG(total_spent) as avg_total_spent
FROM analytics_customer_metrics
GROUP BY engagement_level
ORDER BY avg_total_spent DESC
""",
'daily_trends': """
SELECT
DATE(last_login_date) as login_date,
COUNT(*) as active_customers,
AVG(total_spent) as avg_spend_per_customer
FROM analytics_customer_metrics
WHERE last_login_date >= CURRENT_DATE - INTERVAL 30 DAY
GROUP BY DATE(last_login_date)
ORDER BY login_date DESC
"""
}
# Ergebnisse sammeln
analytics_results = {}
for query_name, query in analytics_queries.items():
try:
result_df = postgres_hook.get_pandas_df(query)
analytics_results[query_name] = result_df
logger.info(f"Analytics Query '{query_name}' ausgeführt: {len(result_df)} Zeilen")
except Exception as e:
logger.error(f"Fehler bei Query '{query_name}': {e}")
analytics_results[query_name] = pd.DataFrame()
# Report erstellen
report_data = {
'generated_at': datetime.now().isoformat(),
'total_customers': len(analytics_results.get('customer_segments', pd.DataFrame())),
'analytics': {}
}
for query_name, result_df in analytics_results.items():
if not result_df.empty:
report_data['analytics'][query_name] = result_df.to_dict('records')
# Report als JSON speichern
report_json = json.dumps(report_data, indent=2, default=str)
# Report in S3 speichern
try:
s3_hook = S3Hook(aws_conn_id='aws_default')
bucket_name = Variable.get('reports_bucket_name', default_var='analytics-reports')
report_filename = f"customer_analytics_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
s3_hook.load_string(
report_json,
key=report_filename,
bucket_name=bucket_name,
replace=True
)
logger.info(f"Analytics Report gespeichert: s3://{bucket_name}/{report_filename}")
except Exception as e:
logger.error(f"Fehler beim Speichern des Reports: {e}")
# Report-Metriken loggen
if 'customer_segments' in analytics_results:
segments_df = analytics_results['customer_segments']
if not segments_df.empty:
logger.info("Kunden-Segment Verteilung:")
for _, row in segments_df.iterrows():
logger.info(f" {row['customer_segment']}: {row['customer_count']} Kunden, "
f"Ø Umsatz: €{row['avg_total_spent']:.2f}")
logger.info("Analytics Report generierung abgeschlossen")
# Tasks definieren
extract_task = PythonOperator(
task_id='extract_customer_data',
python_callable=extract_customer_data,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_customer_data',
python_callable=transform_customer_data,
dag=dag,
)
load_task = PythonOperator(
task_id='load_data_to_warehouse',
python_callable=load_data_to_warehouse,
dag=dag,
)
report_task = PythonOperator(
task_id='generate_analytics_report',
python_callable=generate_analytics_report,
dag=dag,
)
# Task Dependencies
extract_task >> transform_task >> load_task >> report_task
3. Data Visualization mit Matplotlib, Seaborn und Plotly
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# Data Visualization Demo
class DataVisualizationDemo:
def __init__(self):
"""Stil und Konfiguration festlegen"""
# Matplotlib Konfiguration
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 10
plt.rcParams['axes.titlesize'] = 14
plt.rcParams['axes.labelsize'] = 12
plt.rcParams['xtick.labelsize'] = 10
plt.rcParams['ytick.labelsize'] = 10
plt.rcParams['legend.fontsize'] = 10
plt.rcParams['figure.titlesize'] = 16
# Seaborn Konfiguration
sns.set_palette("husl")
# Farben definieren
self.colors = {
'primary': '#2E86AB',
'secondary': '#A23B72',
'accent': '#F18F01',
'success': '#C73E1D',
'warning': '#F4A261',
'info': '#264653',
'light': '#E9C46A',
'dark': '#2A9D8F'
}
print("Data Visualization Demo initialisiert")
def create_sample_data(self):
"""Beispieldaten für Visualisierung erstellen"""
np.random.seed(42)
# Zeitreihendaten (Umsatzentwicklung)
dates = pd.date_range(start='2023-01-01', end='2023-12-31', freq='D')
base_revenue = 10000
trend = np.linspace(0, 5000, len(dates))
seasonal = 2000 * np.sin(2 * np.pi * np.arange(len(dates)) / 365.25)
noise = np.random.normal(0, 500, len(dates))
revenue_data = pd.DataFrame({
'date': dates,
'revenue': base_revenue + trend + seasonal + noise,
'orders': np.random.poisson(100, len(dates)) + 50,
'customers': np.random.poisson(80, len(dates)) + 30
})
# Kundendaten
customer_data = pd.DataFrame({
'customer_id': range(1000),
'age': np.random.normal(35, 10, 1000),
'income': np.random.lognormal(10.5, 0.5, 1000),
'total_spent': np.random.gamma(2, 100, 1000),
'orders': np.random.poisson(5, 1000) + 1,
'segment': np.random.choice(['Bronze', 'Silver', 'Gold', 'Platinum'], 1000,
p=[0.4, 0.3, 0.2, 0.1]),
'registration_date': pd.date_range('2020-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)],
'last_purchase': pd.date_range('2023-01-01', '2023-12-31', periods=1000)[np.random.permutation(1000)]
})
# Produktdaten
products = ['Laptop', 'Smartphone', 'Tablet', 'Headphones', 'Mouse', 'Keyboard', 'Monitor', 'Webcam']
product_data = pd.DataFrame({
'product': np.random.choice(products, 5000),
'category': np.random.choice(['Electronics', 'Accessories'], 5000),
'price': np.random.uniform(10, 1000, 5000),
'quantity_sold': np.random.poisson(10, 5000) + 1,
'rating': np.random.uniform(3, 5, 5000),
'month': np.random.choice(range(1, 13), 5000)
})
# Geografische Daten
regions = ['North', 'South', 'East', 'West', 'Central']
geo_data = pd.DataFrame({
'region': np.random.choice(regions, 1000),
'city': [f"{region} City {i}" for region, i in zip(
np.random.choice(regions, 1000),
np.random.randint(1, 10, 1000)
)],
'population': np.random.lognormal(10, 1, 1000),
'revenue': np.random.gamma(2, 50000, 1000),
'stores': np.random.poisson(5, 1000) + 1
})
return revenue_data, customer_data, product_data, geo_data
def create_time_series_visualizations(self, revenue_data):
"""Zeitreihen-Visualisierungen erstellen"""
print("Erstelle Zeitreihen-Visualisierungen...")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Umsatz-Analyse Zeitreihe', fontsize=16, fontweight='bold')
# 1. Umsatzentwicklung
axes[0, 0].plot(revenue_data['date'], revenue_data['revenue'],
color=self.colors['primary'], linewidth=2)
axes[0, 0].set_title('Tägliche Umsatzentwicklung')
axes[0, 0].set_xlabel('Datum')
axes[0, 0].set_ylabel('Umsatz (€)')
axes[0, 0].grid(True, alpha=0.3)
# Moving Average hinzufügen
revenue_data['ma_7'] = revenue_data['revenue'].rolling(window=7).mean()
revenue_data['ma_30'] = revenue_data['revenue'].rolling(window=30).mean()
axes[0, 0].plot(revenue_data['date'], revenue_data['ma_7'],
color=self.colors['accent'], linewidth=1, alpha=0.7, label='7-Tage MA')
axes[0, 0].plot(revenue_data['date'], revenue_data['ma_30'],
color=self.colors['secondary'], linewidth=1, alpha=0.7, label='30-Tage MA')
axes[0, 0].legend()
# 2. Monatsweise Umsatz
monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
axes[0, 1].bar(monthly_revenue.index, monthly_revenue.values,
color=self.colors['info'], alpha=0.7)
axes[0, 1].set_title('Monatlicher Gesamtumsatz')
axes[0, 1].set_xlabel('Monat')
axes[0, 1].set_ylabel('Umsatz (€)')
axes[0, 1].tick_params(axis='x', rotation=45)
# 3. Umsatz vs Bestellungen Scatter Plot
axes[1, 0].scatter(revenue_data['orders'], revenue_data['revenue'],
alpha=0.6, color=self.colors['primary'])
axes[1, 0].set_title('Umsatz vs. Bestellungen')
axes[1, 0].set_xlabel('Anzahl Bestellungen')
axes[1, 0].set_ylabel('Umsatz (€)')
# Trendlinie hinzufügen
z = np.polyfit(revenue_data['orders'], revenue_data['revenue'], 1)
p = np.poly1d(z)
axes[1, 0].plot(revenue_data['orders'], p(revenue_data['orders']),
color=self.colors['accent'], linewidth=2)
# 4. Verteilung der täglichen Umsätze
axes[1, 1].hist(revenue_data['revenue'], bins=30, color=self.colors['secondary'],
alpha=0.7, edgecolor='black')
axes[1, 1].set_title('Verteilung täglicher Umsätze')
axes[1, 1].set_xlabel('Umsatz (€)')
axes[1, 1].set_ylabel('Häufigkeit')
axes[1, 1].axvline(revenue_data['revenue'].mean(), color=self.colors['accent'],
linestyle='--', linewidth=2, label=f'Mittelwert: €{revenue_data["revenue"].mean():.0f}')
axes[1, 1].legend()
plt.tight_layout()
plt.savefig('time_series_analysis.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_customer_analytics_visualizations(self, customer_data):
"""Kunden-Analytics Visualisierungen erstellen"""
print("Erstelle Kunden-Analytics Visualisierungen...")
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Kunden-Analytics Dashboard', fontsize=16, fontweight='bold')
# 1. Altersverteilung
axes[0, 0].hist(customer_data['age'], bins=20, color=self.colors['primary'],
alpha=0.7, edgecolor='black')
axes[0, 0].set_title('Altersverteilung')
axes[0, 0].set_xlabel('Alter')
axes[0, 0].set_ylabel('Anzahl')
# 2. Einkommensverteilung
axes[0, 1].hist(customer_data['income'], bins=30, color=self.colors['secondary'],
alpha=0.7, edgecolor='black')
axes[0, 1].set_title('Einkommensverteilung')
axes[0, 1].set_xlabel('Einkommen (€)')
axes[0, 1].set_ylabel('Anzahl')
# 3. Kunden-Segment Verteilung
segment_counts = customer_data['segment'].value_counts()
axes[0, 2].pie(segment_counts.values, labels=segment_counts.index,
autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']])
axes[0, 2].set_title('Kunden-Segment Verteilung')
# 4. Umsatz nach Segment
segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
bars = axes[1, 0].bar(segment_revenue.index, segment_revenue.values,
color=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']])
axes[1, 0].set_title('Durchschnittlicher Umsatz nach Segment')
axes[1, 0].set_xlabel('Segment')
axes[1, 0].set_ylabel('Ø Umsatz (€)')
# Werte auf Bars anzeigen
for bar, value in zip(bars, segment_revenue.values):
axes[1, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + 100,
f'€{value:.0f}', ha='center', va='bottom')
# 5. Alter vs. Gesamtausgaben
scatter = axes[1, 1].scatter(customer_data['age'], customer_data['total_spent'],
alpha=0.6, c=customer_data['orders'], cmap='viridis')
axes[1, 1].set_title('Alter vs. Gesamtausgaben')
axes[1, 1].set_xlabel('Alter')
axes[1, 1].set_ylabel('Gesamtausgaben (€)')
plt.colorbar(scatter, ax=axes[1, 1], label='Anzahl Bestellungen')
# 6. Boxplot für Umsatz nach Segment
segments = customer_data['segment'].unique()
data_for_boxplot = [customer_data[customer_data['segment'] == seg]['total_spent']
for seg in segments]
box_plot = axes[1, 2].boxplot(data_for_boxplot, labels=segments, patch_artist=True)
colors = [self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']]
for patch, color in zip(box_plot['boxes'], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
axes[1, 2].set_title('Umsatz-Verteilung nach Segment')
axes[1, 2].set_xlabel('Segment')
axes[1, 2].set_ylabel('Gesamtausgaben (€)')
plt.tight_layout()
plt.savefig('customer_analytics.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_product_analytics_visualizations(self, product_data):
"""Produkt-Analytics Visualisierungen erstellen"""
print("Erstelle Produkt-Analytics Visualisierungen...")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Produkt-Analytics Dashboard', fontsize=16, fontweight='bold')
# 1. Top-Produkte nach Umsatz
product_revenue = product_data.groupby('product').apply(
lambda x: (x['price'] * x['quantity_sold']).sum()
).sort_values(ascending=False)
bars = axes[0, 0].barh(product_revenue.index, product_revenue.values,
color=self.colors['primary'])
axes[0, 0].set_title('Top-Produkte nach Gesamtumsatz')
axes[0, 0].set_xlabel('Gesamtumsatz (€)')
# Werte auf Bars anzeigen
for i, (bar, value) in enumerate(zip(bars, product_revenue.values)):
axes[0, 0].text(value + max(product_revenue.values) * 0.01, i,
f'€{value:,.0f}', va='center')
# 2. Preisverteilung nach Kategorie
categories = product_data['category'].unique()
for i, category in enumerate(categories):
category_data = product_data[product_data['category'] == category]['price']
axes[0, 1].hist(category_data, bins=20, alpha=0.7, label=category,
color=self.colors['primary'] if i == 0 else self.colors['secondary'])
axes[0, 1].set_title('Preisverteilung nach Kategorie')
axes[0, 1].set_xlabel('Preis (€)')
axes[0, 1].set_ylabel('Häufigkeit')
axes[0, 1].legend()
# 3. Bewertung vs. Preis
axes[1, 0].scatter(product_data['price'], product_data['rating'],
alpha=0.6, color=self.colors['accent'])
axes[1, 0].set_title('Bewertung vs. Preis')
axes[1, 0].set_xlabel('Preis (€)')
axes[1, 0].set_ylabel('Bewertung')
# Korrelationskoeffizient
correlation = np.corrcoef(product_data['price'], product_data['rating'])[0, 1]
axes[1, 0].text(0.05, 0.95, f'Korrelation: {correlation:.3f}',
transform=axes[1, 0].transAxes, va='top',
bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
# 4. Monatliche Verkäufe
monthly_sales = product_data.groupby('month')['quantity_sold'].sum()
month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
axes[1, 1].bar(monthly_sales.index, monthly_sales.values,
color=self.colors['info'], alpha=0.7)
axes[1, 1].set_title('Monatliche Verkaufsmengen')
axes[1, 1].set_xlabel('Monat')
axes[1, 1].set_ylabel('Verkaufsmenge')
axes[1, 1].set_xticks(range(1, 13))
axes[1, 1].set_xticklabels(month_names, rotation=45)
plt.tight_layout()
plt.savefig('product_analytics.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_geographic_visualizations(self, geo_data):
"""Geografische Visualisierungen erstellen"""
print("Erstelle geografische Visualisierungen...")
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
fig.suptitle('Geografische Analyse Dashboard', fontsize=16, fontweight='bold')
# 1. Umsatz nach Region
region_revenue = geo_data.groupby('region')['revenue'].sum().sort_values(ascending=False)
bars = axes[0, 0].bar(region_revenue.index, region_revenue.values,
color=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success'], self.colors['info'])
axes[0, 0].set_title('Gesamtumsatz nach Region')
axes[0, 0].set_ylabel('Umsatz (€)')
# Werte auf Bars anzeigen
for bar, value in zip(bars, region_revenue.values):
axes[0, 0].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_revenue.values) * 0.01,
f'€{value:,.0f}', ha='center', va='bottom')
# 2. Bevölkerung vs. Umsatz
axes[0, 1].scatter(geo_data['population'], geo_data['revenue'],
alpha=0.6, s=geo_data['stores']*10,
c=self.colors['primary'])
axes[0, 1].set_title('Bevölkerung vs. Umsatz')
axes[0, 1].set_xlabel('Bevölkerung')
axes[0, 1].set_ylabel('Umsatz (€)')
# 3. Store-Dichte nach Region
region_stores = geo_data.groupby('region')['stores'].sum()
axes[1, 0].pie(region_stores.values, labels=region_stores.index,
autopct='%1.1f%%', colors=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success'], self.colors['info']])
axes[1, 0].set_title('Store-Verteilung nach Region')
# 4. Umsatz pro Store
region_avg_revenue = geo_data.groupby('region').apply(
lambda x: x['revenue'].sum() / x['stores'].sum()
).sort_values(ascending=False)
bars = axes[1, 1].bar(region_avg_revenue.index, region_avg_revenue.values,
color=self.colors['secondary'])
axes[1, 1].set_title('Durchschnittlicher Umsatz pro Store')
axes[1, 1].set_ylabel('Ø Umsatz pro Store (€)')
# Werte auf Bars anzeigen
for bar, value in zip(bars, region_avg_revenue.values):
axes[1, 1].text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(region_avg_revenue.values) * 0.01,
f'€{value:,.0f}', ha='center', va='bottom')
plt.tight_layout()
plt.savefig('geographic_analytics.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
def create_interactive_visualizations(self, revenue_data, customer_data, product_data):
"""Interaktive Visualisierungen mit Plotly erstellen"""
print("Erstelle interaktive Visualisierungen...")
# 1. Interaktive Zeitreihe
fig1 = make_subplots(
rows=2, cols=1,
subplot_titles=('Täglicher Umsatz', 'Monatlicher Umsatz'),
vertical_spacing=0.1
)
# Täglicher Umsatz mit Moving Averages
fig1.add_trace(
go.Scatter(
x=revenue_data['date'],
y=revenue_data['revenue'],
mode='lines',
name='Täglicher Umsatz',
line=dict(color='blue', width=1)
),
row=1, col=1
)
fig1.add_trace(
go.Scatter(
x=revenue_data['date'],
y=revenue_data['ma_7'],
mode='lines',
name='7-Tage MA',
line=dict(color='orange', width=2)
),
row=1, col=1
)
# Monatlicher Umsatz
monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
fig1.add_trace(
go.Bar(
x=monthly_revenue.index,
y=monthly_revenue.values,
name='Monatlicher Umsatz',
marker_color='lightblue'
),
row=2, col=1
)
fig1.update_layout(
title='Interaktive Umsatz-Analyse',
height=600,
showlegend=True
)
fig1.show()
# 2. Interaktive Kunden-Analyse
fig2 = make_subplots(
rows=2, cols=2,
subplot_titles=('Altersverteilung', 'Einkommensverteilung',
'Umsatz nach Segment', 'Alter vs. Umsatz'),
specs=[[{"type": "histogram"}, {"type": "histogram"}],
[{"type": "bar"}, {"type": "scatter"}]]
)
# Altersverteilung
fig2.add_trace(
go.Histogram(
x=customer_data['age'],
nbinsx=20,
name='Alter',
marker_color='lightgreen'
),
row=1, col=1
)
# Einkommensverteilung
fig2.add_trace(
go.Histogram(
x=customer_data['income'],
nbinsx=30,
name='Einkommen',
marker_color='lightcoral'
),
row=1, col=2
)
# Umsatz nach Segment
segment_revenue = customer_data.groupby('segment')['total_spent'].mean()
fig2.add_trace(
go.Bar(
x=segment_revenue.index,
y=segment_revenue.values,
name='Ø Umsatz',
marker_color='lightblue'
),
row=2, col=1
)
# Alter vs. Umsatz
fig2.add_trace(
go.Scatter(
x=customer_data['age'],
y=customer_data['total_spent'],
mode='markers',
name='Kunden',
marker=dict(
size=8,
color=customer_data['orders'],
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Bestellungen")
)
),
row=2, col=2
)
fig2.update_layout(
title='Interaktive Kunden-Analyse',
height=600,
showlegend=False
)
fig2.show()
# 3. Interaktive 3D-Visualisierung
fig3 = go.Figure(data=[go.Scatter3d(
x=customer_data['age'],
y=customer_data['income'],
z=customer_data['total_spent'],
mode='markers',
marker=dict(
size=5,
color=customer_data['orders'],
colorscale='Viridis',
showscale=True,
colorbar=dict(title="Bestellungen")
),
text=[f"Kunde {i}<br>Alter: {age}<br>Einkommen: {inc:,.0f}€<br>Umsatz: {spent:,.0f}€"
for i, (age, inc, spent) in enumerate(zip(customer_data['age'],
customer_data['income'],
customer_data['total_spent']))],
hovertemplate='%{text}<extra></extra>'
)])
fig3.update_layout(
title='3D Kunden-Analyse (Alter, Einkommen, Umsatz)',
scene=dict(
xaxis_title='Alter',
yaxis_title='Einkommen (€)',
zaxis_title='Umsatz (€)'
),
height=600
)
fig3.show()
return fig1, fig2, fig3
def create_dashboard_summary(self, revenue_data, customer_data, product_data, geo_data):
"""Zusammenfassendes Dashboard erstellen"""
print("Erstelle zusammenfassendes Dashboard...")
# Berechne Key Metrics
total_revenue = revenue_data['revenue'].sum()
avg_daily_revenue = revenue_data['revenue'].mean()
total_customers = len(customer_data)
avg_customer_value = customer_data['total_spent'].mean()
total_orders = customer_data['orders'].sum()
top_product = product_data.groupby('product').apply(
lambda x: (x['price'] * x['quantity_sold']).sum()
).idxmax()
# Dashboard erstellen
fig = plt.figure(figsize=(20, 16))
# Titel
fig.suptitle('Data Science Analytics Dashboard', fontsize=20, fontweight='bold')
# Key Metrics
metrics_text = f"""
KEY METRICS
─────────────────────
Gesamtumsatz: €{total_revenue:,.0f}
Ø Tagesumsatz: €{avg_daily_revenue:,.0f}
Gesamt-Kunden: {total_customers:,}
Ø Kundenwert: €{avg_customer_value:,.0f}
Gesamt-Bestellungen: {total_orders:,}
Top-Produkt: {top_product}
"""
plt.figtext(0.02, 0.95, metrics_text, fontsize=12, fontfamily='monospace',
verticalalignment='top', bbox=dict(boxstyle='round', facecolor='lightgray', alpha=0.8))
# Subplots für verschiedene Analysen
gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3,
left=0.15, right=0.95, top=0.85, bottom=0.05)
# 1. Umsatz-Trend (oben links)
ax1 = fig.add_subplot(gs[0, 0])
monthly_revenue = revenue_data.set_index('date').resample('M')['revenue'].sum()
ax1.plot(monthly_revenue.index, monthly_revenue.values,
color=self.colors['primary'], linewidth=2)
ax1.set_title('Monatlicher Umsatz-Trend')
ax1.tick_params(axis='x', rotation=45)
# 2. Kunden-Segmente (oben mitte)
ax2 = fig.add_subplot(gs[0, 1])
segment_counts = customer_data['segment'].value_counts()
ax2.pie(segment_counts.values, labels=segment_counts.index, autopct='%1.1f%%',
colors=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success']])
ax2.set_title('Kunden-Segmente')
# 3. Top-Produkte (oben rechts)
ax3 = fig.add_subplot(gs[0, 2])
product_revenue = product_data.groupby('product').apply(
lambda x: (x['price'] * x['quantity_sold']).sum()
).sort_values(ascending=False).head(5)
bars = ax3.barh(range(len(product_revenue)), product_revenue.values,
color=self.colors['info'])
ax3.set_yticks(range(len(product_revenue)))
ax3.set_yticklabels(product_revenue.index)
ax3.set_title('Top 5 Produkte')
ax3.set_xlabel('Umsatz (€)')
# 4. Altersverteilung (mitte links)
ax4 = fig.add_subplot(gs[1, 0])
ax4.hist(customer_data['age'], bins=15, color=self.colors['secondary'], alpha=0.7)
ax4.set_title('Altersverteilung')
ax4.set_xlabel('Alter')
# 5. Regionale Verteilung (mitte mitte)
ax5 = fig.add_subplot(gs[1, 1])
region_revenue = geo_data.groupby('region')['revenue'].sum()
ax5.bar(region_revenue.index, region_revenue.values,
color=[self.colors['primary'], self.colors['secondary'],
self.colors['accent'], self.colors['success'], self.colors['info']])
ax5.set_title('Umsatz nach Region')
ax5.set_ylabel('Umsatz (€)')
# 6. Bewertung vs. Preis (mitte rechts)
ax6 = fig.add_subplot(gs[1, 2])
ax6.scatter(product_data['price'], product_data['rating'],
alpha=0.6, color=self.colors['accent'])
ax6.set_title('Bewertung vs. Preis')
ax6.set_xlabel('Preis (€)')
ax6.set_ylabel('Bewertung')
# 7. Umsatz-Verteilung (unten links)
ax7 = fig.add_subplot(gs[2, :2])
ax7.hist(revenue_data['revenue'], bins=30, color=self.colors['primary'],
alpha=0.7, edgecolor='black')
ax7.set_title('Tägliche Umsatz-Verteilung')
ax7.set_xlabel('Umsatz (€)')
ax7.set_ylabel('Häufigkeit')
ax7.axvline(revenue_data['revenue'].mean(), color=self.colors['accent'],
linestyle='--', linewidth=2, label=f'Mittelwert: €{revenue_data["revenue"].mean():.0f}')
ax7.legend()
# 8. Korrelations-Heatmap (unten rechts)
ax8 = fig.add_subplot(gs[2, 2])
# Korrelationsmatrix erstellen
corr_data = customer_data[['age', 'income', 'total_spent', 'orders']].corr()
im = ax8.imshow(corr_data, cmap='coolwarm', aspect='auto', vmin=-1, vmax=1)
# Farblegende
cbar = plt.colorbar(im, ax=ax8)
cbar.set_label('Korrelation')
# Labels und Werte
ax8.set_xticks(range(len(corr_data.columns)))
ax8.set_yticks(range(len(corr_data.columns)))
ax8.set_xticklabels(corr_data.columns, rotation=45)
ax8.set_yticklabels(corr_data.columns)
# Werte in die Zellen schreiben
for i in range(len(corr_data.columns)):
for j in range(len(corr_data.columns)):
text = ax8.text(j, i, f'{corr_data.iloc[i, j]:.2f}',
ha="center", va="center", color="black")
ax8.set_title('Korrelationsmatrix')
plt.savefig('comprehensive_dashboard.png', dpi=300, bbox_inches='tight')
plt.show()
return fig
# Demo ausführen
def data_visualization_demo():
viz = DataVisualizationDemo()
try:
# Beispieldaten erstellen
revenue_data, customer_data, product_data, geo_data = viz.create_sample_data()
# Verschiedene Visualisierungen erstellen
viz.create_time_series_visualizations(revenue_data)
viz.create_customer_analytics_visualizations(customer_data)
viz.create_product_analytics_visualizations(product_data)
viz.create_geographic_visualizations(geo_data)
# Interaktive Visualisierungen
viz.create_interactive_visualizations(revenue_data, customer_data, product_data)
# Zusammenfassendes Dashboard
viz.create_dashboard_summary(revenue_data, customer_data, product_data, geo_data)
print("Data Visualization Demo erfolgreich abgeschlossen!")
except Exception as e:
print(f"Fehler in Data Visualization Demo: {e}")
if __name__ == "__main__":
data_visualization_demo()
Data Science Workflow
Typischer Data Science Process
graph TD
A[Problem Definition] --> B[Data Collection]
B --> C[Data Cleaning]
C --> D[Exploratory Analysis]
D --> E[Feature Engineering]
E --> F[Model Building]
F --> G[Model Evaluation]
G --> H[Deployment]
H --> I[Monitoring]
I --> A
CRISP-DM Methodology
- Business Understanding: Geschäftsziele verstehen
- Data Understanding: Daten explorieren
- Data Preparation: Daten bereinigen und vorbereiten
- Modeling: Modelle entwickeln
- Evaluation: Modelle bewerten
- Deployment: Modelle部署
- Monitoring: Performance überwachen
Big Data Technologies
Hadoop Ecosystem
| Komponente | Funktion | Alternativen |
|---|---|---|
| HDFS | Distributed Storage | Amazon S3, Google Cloud Storage |
| MapReduce | Batch Processing | Apache Spark, Apache Flink |
| Hive | SQL on Hadoop | Presto, Apache Impala |
| HBase | NoSQL Database | Cassandra, MongoDB |
| Zookeeper | Coordination | Consul, etcd |
Spark vs. MapReduce
| Feature | Spark | MapReduce |
|---|---|---|
| Speed | In-Memory | Disk-based |
| Ease of Use | High APIs | Low-level |
| Streaming | Native | Batch only |
| ML Support | MLlib | External |
| Resource Usage | Higher | Lower |
Machine Learning Pipeline Components
Data Pipeline Stages
# ML Pipeline Beispiel mit Scikit-learn
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
# Preprocessing
numeric_features = ['age', 'income', 'orders']
categorical_features = ['segment', 'region']
numeric_transformer = Pipeline(steps=[
('scaler', StandardScaler())
])
categorical_transformer = Pipeline(steps=[
('onehot', OneHotEncoder(handle_unknown='ignore'))
])
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)
])
# Complete Pipeline
ml_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('regressor', RandomForestRegressor(n_estimators=100))
])
MLOps Practices
- Version Control: Git für Code, DVC für Daten
- Experiment Tracking: MLflow, Weights & Biases
- Model Registry: MLflow Model Registry
- CI/CD: GitHub Actions, Jenkins
- Monitoring: Prometheus, Grafana
- Drift Detection: Evidently AI, NannyML
Data Visualization Principles
Chart Selection Guide
| Daten-Typ | Visualisierung | Zweck |
|---|---|---|
| Zeitreihe | Line Chart | Trends über Zeit |
| Kategorisch | Bar Chart | Vergleiche |
| Verteilung | Histogram | Häufigkeiten |
| Beziehung | Scatter Plot | Korrelationen |
| Komposition | Pie Chart | Anteile |
| Geografisch | Map | Standorte |
Color Theory für Data Viz
- Primärfarben: Blau, Rot, Gelb
- Sekundärfarben: Grün, Orange, Lila
- Farbpsychologie: Blau=Vertrauen, Rot=Gefahr, Grün=Erfolg
- Kontraste: Hell/Dunkel für Lesbarkeit
- Farbenblindheit: 8-10% der Bevölkerung
ETL Best Practices
Data Quality Checks
def validate_data_quality(df):
"""Datenqualität validieren"""
quality_report = {
'total_rows': len(df),
'missing_values': df.isnull().sum().to_dict(),
'duplicate_rows': df.duplicated().sum(),
'data_types': df.dtypes.to_dict(),
'outliers': detect_outliers(df)
}
return quality_report
def detect_outliers(df, column):
"""Outliers mit IQR-Methode erkennen"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[column] < lower_bound) | (df[column] > upper_bound)]
return len(outliers)
Performance Optimization
- Parallel Processing: Multiprocessing, Dask
- Memory Management: Chunking, Generators
- Indexing: Database Indexes
- Caching: Redis, Memcached
- Batch Processing: Bulk Operations
Analytics Techniques
Statistical Methods
- Descriptive Statistics: Mean, Median, Mode, Std Dev
- Inferential Statistics: Hypothesis Testing, Confidence Intervals
- Correlation Analysis: Pearson, Spearman, Kendall
- Regression Analysis: Linear, Logistic, Polynomial
- Time Series: ARIMA, Prophet, LSTM
Advanced Analytics
- Clustering: K-Means, Hierarchical, DBSCAN
- Classification: Decision Trees, Random Forest, SVM
- Anomaly Detection: Isolation Forest, One-Class SVM
- Dimensionality Reduction: PCA, t-SNE, UMAP
- Association Rules: Apriori, FP-Growth
Vorteile und Nachteile
Vorteile von Data Science
- Data-Driven Decisions: Bessere Geschäftsentscheidungen
- Pattern Recognition: Verborgene Muster entdecken
- Predictive Analytics: Zukünftige Trends vorhersagen
- Process Optimization: Effizienz steigern
- Competitive Advantage: Wettbewerbsvorteile
Nachteile
- Data Quality: Abhängig von Datenqualität
- Complexity: Komplexe Algorithmen und Tools
- Privacy Concerns: Datenschutz und Ethik
- Resource Intensive: Rechenleistung und Speicher
- Interpretability: Black-Box-Modelle
Häufige Prüfungsfragen
-
Was ist der Unterschied zwischen Big Data und traditionellen Daten? Big Data zeichnet sich durch die 3V (Volume, Velocity, Variety) aus und erfordert spezielle Technologien zur Verarbeitung.
-
Erklären Sie den ETL-Prozess! ETL steht für Extract (Daten extrahieren), Transform (Daten bereinigen und umformen) und Load (Daten in Zielsystem laden).
-
Wann verwendet man welche Chart-Art? Zeitreihen für Trends, Bar Charts für Vergleiche, Histogramme für Verteilungen, Scatter Plots für Beziehungen.
-
Was ist der Zweck von Machine Learning Pipelines? ML-Pipelines automatisieren und standardisieren den gesamten Workflow von Datenaufbereitung bis zum Model-Deployment.
Wichtigste Quellen
- https://spark.apache.org/
- https://airflow.apache.org/
- https://matplotlib.org/
- https://plotly.com/python/