Skip to content
IRC-Coding IRC-Coding
Reactive Programming Observables Subjects RxJava Project Reactor Asynchrone Datenströme

Reactive Programming: Asynchrone Datenströme, Observables & Subjects

Reactive Programming mit asynchronen Datenströmen. Observables, Subjects, Operators, Backpressure und praktische Beispiele mit RxJava, Project Reactor und JavaScript.

S

schutzgeist

2 min read

Reactive Programming: Asynchrone Datenströme, Observables & Subjects

Dieser Beitrag ist eine umfassende Einführung in das Reactive Programming – inklusive asynchroner Datenströme, Observables, Subjects und praktischen Beispielen.

In a Nutshell

Reactive Programming dreht sich um asynchrone Datenströme. Anstatt zu warten, abonniert man Datenströme und reagiert auf Ereignisse, sobald sie eintreffen.

Kompakte Fachbeschreibung

Reactive Programming ist ein Paradigma für die Verarbeitung asynchroner Datenströme mit nicht-blockierenden, ereignisgesteuerten Programmen.

Kernprinzipien:

  • Asynchronität: Aufgaben werden im Hintergrund ausgeführt, Hauptprogramm wird nicht blockiert
  • Datenströme (Streams): Alles wird als Sequenz von Ereignissen betrachtet
  • Observer Pattern: Datenströme werden abonniert, nicht abgefragt
  • Reactive Streams: Standard für asynchrone Stream-Verarbeitung mit Backpressure

Wichtige Konzepte:

  • Observable: Datenstrom, der 0..n Werte emitieren kann
  • Observer: Empfänger der Daten vom Observable
  • Subscription: Verbindung zwischen Observable und Observer
  • Operators: Transformationen und Filterung von Datenströmen
  • Scheduler: Kontrolle über Thread-Ausführung
  • Backpressure: Schutz vor Überlastung bei schnellen Produzenten

Prüfungsrelevante Stichpunkte

  • Reactive Programming: Asynchrone, nicht-blockierende Verarbeitung
  • Observable: Datenstrom mit 0..n Werten
  • Observer: Empfänger für Datenereignisse
  • Subscription: Verbindungsmanagement zwischen Observable und Observer
  • Operators: map, filter, flatMap für Stream-Transformation
  • Backpressure: Schutz vor Überlastung, Flow Control
  • Scheduler: Thread-Kontrolle für asynchrone Operationen
  • IHK-relevant: Modernes Architekturparadigma für skalierbare Systeme

Kernkomponenten

  1. Observable: Quelle von Datenereignissen
  2. Observer: Empfänger der Daten
  3. Subscription: Verbindung und Ressourcenmanagement
  4. Operators: Datenstrom-Transformationen
  5. Subjects: Sowohl Observable als auch Observer
  6. Scheduler: Thread-Ausführungskontrolle
  7. Backpressure: Flow Control Mechanismen
  8. Reactive Streams: Standard-Spezifikation

Praxisbeispiele

1. Reactive Programming mit RxJava

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class ReactiveProgrammingDemo {
    
    public static void main(String[] args) throws InterruptedException {
        // Einfaches Observable
        Observable<String> observable = Observable.just("Hallo", "Reactive", "World");
        
        // Observer abonnieren
        observable.subscribe(
            value -> System.out.println("Next: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Completed!")
        );
        
        // Asynchrone Operationen
        asyncOperations();
        
        // Operators Demonstration
        operatorsDemo();
        
        // Backpressure mit Flowable
        backpressureDemo();
        
        // Subjects für Multicasting
        subjectsDemo();
    }
    
    private static void asyncOperations() {
        System.out.println("\n=== Asynchrone Operationen ===");
        
        // Observable mit Scheduler
        Observable.fromArray("Task 1", "Task 2", "Task 3")
            .subscribeOn(Schedulers.io()) // Ausführung auf IO-Thread
            .observeOn(Schedulers.single()) // Beobachtung auf Single-Thread
            .subscribe(
                task -> System.out.println("Verarbeite: " + task + " auf " + Thread.currentThread().getName()),
                error -> System.err.println("Fehler: " + error),
                () -> System.out.println("Alle Tasks abgeschlossen")
            );
        
        // Zeitbasiertes Observable
        Observable.interval(1, TimeUnit.SECONDS)
            .take(5)
            .map(tick -> "Tick " + (tick + 1))
            .subscribe(
                tick -> System.out.println(tick + " auf " + Thread.currentThread().getName())
            );
        
        // Asynchrone API-Aufrufe simulieren
        Observable<String> apiCall = Observable.fromCallable(() -> {
            Thread.sleep(1000); // Simulierter API-Aufruf
            return "API-Daten";
        }).subscribeOn(Schedulers.io());
        
        apiCall
            .observeOn(Schedulers.single())
            .subscribe(
                data -> System.out.println("API-Ergebnis: " + data),
                error -> System.err.println("API-Fehler: " + error)
            );
    }
    
    private static void operatorsDemo() {
        System.out.println("\n=== Operators Demonstration ===");
        
        Observable.range(1, 10)
            .filter(n -> n % 2 == 0) // Gerade Zahlen filtern
            .map(n -> n * n) // Quadrieren
            .take(3) // Nur erste 3 Elemente
            .subscribe(
                result -> System.out.println("Result: " + result),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Operators Demo abgeschlossen")
            );
        
        // flatMap für asynchrone Transformation
        Observable.just("user1", "user2", "user3")
            .flatMap(userId -> getUserData(userId)
                .subscribeOn(Schedulers.io()) // Jeder Aufruf auf eigenem Thread
            )
            .subscribe(
                userData -> System.out.println("User Data: " + userData),
                error -> System.err.println("User Error: " + error)
            );
    }
    
    private static void backpressureDemo() {
        System.out.println("\n=== Backpressure Demo ===");
        
        // Fast Producer mit Flowable
        Flowable.range(1, 1000)
            .onBackpressureBuffer(100) // Buffer mit Größe 100
            .observeOn(Schedulers.io())
            .subscribe(
                item -> {
                    try {
                        Thread.sleep(10); // Langsamer Consumer
                        System.out.println("Processed: " + item);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                },
                error -> System.err.println("Backpressure Error: " + error)
            );
    }
    
    private static void subjectsDemo() {
        System.out.println("\n=== Subjects Demo ===");
        
        // PublishSubject für Multicasting
        PublishSubject<String> publishSubject = PublishSubject.create();
        
        // Mehrere Observer abonnieren
        Observer<String> observer1 = new Observer<String>() {
            @Override
            public void onNext(String value) {
                System.out.println("Observer 1: " + value);
            }
            
            @Override
            public void onError(Throwable e) {
                System.err.println("Observer 1 Error: " + e);
            }
            
            @Override
            public void onComplete() {
                System.out.println("Observer 1 Completed");
            }
        };
        
        Observer<String> observer2 = new Observer<String>() {
            @Override
            public void onNext(String value) {
                System.out.println("Observer 2: " + value);
            }
            
            @Override
            public void onError(Throwable e) {
                System.err.println("Observer 2 Error: " + e);
            }
            
            @Override
            public void onComplete() {
                System.out.println("Observer 2 Completed");
            }
        };
        
        publishSubject.subscribe(observer1);
        publishSubject.subscribe(observer2);
        
        // Daten emittieren
        publishSubject.onNext("Message 1");
        publishSubject.onNext("Message 2");
        publishSubject.onComplete();
        
        // BehaviorSubject für letzten Wert
        BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.createDefault(0);
        
        behaviorSubject.subscribe(
            value -> System.out.println("Behavior Subject: " + value)
        );
        
        behaviorSubject.onNext(10);
        behaviorSubject.onNext(20);
        
        // Späterer Subscriber erhält letzten Wert
        behaviorSubject.subscribe(
            value -> System.out.println("Late Subscriber: " + value)
        );
    }
    
    // Simulierter API-Aufruf
    private static Observable<String> getUserData(String userId) {
        return Observable.fromCallable(() -> {
            Thread.sleep(500);
            return "UserData for " + userId;
        });
    }
}

2. Reactive Programming mit Project Reactor (Spring)

import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;

public class ReactorDemo {
    
    public static void main(String[] args) throws InterruptedException {
        // Mono für 0..1 Werte
        Mono<String> mono = Mono.just("Hello Reactor");
        
        mono.subscribe(
            value -> System.out.println("Mono: " + value),
            error -> System.err.println("Error: " + error)
        );
        
        // Flux für 0..n Werte
        Flux<String> flux = Flux.just("A", "B", "C", "D", "E");
        
        flux
            .filter(letter -> !"C".equals(letter))
            .map(String::toUpperCase)
            .subscribe(
                letter -> System.out.println("Flux: " + letter),
                error -> System.err.println("Flux Error: " + error),
                () -> System.out.println("Flux Completed")
            );
        
        // Asynchrone Web-API Simulation
        webApiSimulation();
        
        // Error Handling
        errorHandling();
        
        // Backpressure
        backpressureHandling();
    }
    
    private static void webApiSimulation() {
        System.out.println("\n=== Web API Simulation ===");
        
        // Simuliere Web-Request
        Mono<String> webResponse = Mono.fromCallable(() -> {
            Thread.sleep(1000); // Network latency
            return "Response Data";
        }).subscribeOn(Schedulers.boundedElastic());
        
        webResponse
            .map(response -> "Processed: " + response)
            .timeout(Duration.ofSeconds(2))
            .onErrorResume(error -> Mono.just("Fallback Response"))
            .subscribe(
                result -> System.out.println("Web Result: " + result),
                error -> System.err.println("Web Error: " + error)
            );
    }
    
    private static void errorHandling() {
        System.out.println("\n=== Error Handling ===");
        
        Flux<Integer> numbers = Flux.range(1, 10)
            .map(n -> {
                if (n == 5) {
                    throw new RuntimeException("Error at " + n);
                }
                return n * n;
            });
        
        numbers
            .onErrorContinue((error, item) -> 
                System.err.println("Error processing " + item + ": " + error))
            .subscribe(
                result -> System.out.println("Result: " + result),
                error -> System.err.println("Final Error: " + error),
                () -> System.out.println("Error Handling Completed")
            );
    }
    
    private static void backpressureHandling() {
        System.out.println("\n=== Backpressure Handling ===");
        
        // Fast Producer
        Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10))
            .take(100);
        
        // Slow Consumer with backpressure
        fastProducer
            .onBackpressureBuffer(50) // Buffer mit Drop-Strategy
            .publishOn(Schedulers.boundedElastic())
            .delayElements(Duration.ofMillis(50)) // Langsamer Consumer
            .subscribe(
                item -> System.out.println("Processed: " + item),
                error -> System.err.println("Backpressure Error: " + error)
            );
    }
}

// Reactive Service Beispiel
@Service
public class ReactiveUserService {
    
    public Flux<User> getAllUsers() {
        return Flux.fromIterable(getUserList())
            .subscribeOn(Schedulers.parallel());
    }
    
    public Mono<User> getUserById(String id) {
        return Mono.fromCallable(() -> findUserById(id))
            .subscribeOn(Schedulers.boundedElastic())
            .filter(user -> user != null)
            .switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));
    }
    
    public Flux<User> searchUsers(String query) {
        return getAllUsers()
            .filter(user -> user.getName().toLowerCase().contains(query.toLowerCase()))
            .take(10); // Limit results
    }
    
    // Simulierte Datenquelle
    private List<User> getUserList() {
        return List.of(
            new User("1", "Alice", "alice@example.com"),
            new User("2", "Bob", "bob@example.com"),
            new User("3", "Charlie", "charlie@example.com")
        );
    }
    
    private User findUserById(String id) {
        return getUserList().stream()
            .filter(user -> user.getId().equals(id))
            .findFirst()
            .orElse(null);
    }
}

class User {
    private String id;
    private String name;
    private String email;
    
    public User(String id, String name, String email) {
        this.id = id;
        this.name = name;
        this.email = email;
    }
    
    // Getter
    public String getId() { return id; }
    public String getName() { return name; }
    public String getEmail() { return email; }
}

3. Reactive Programming mit JavaScript (RxJS)

// RxJS Reactive Programming
const { Observable, Subject, BehaviorSubject, fromEvent, interval, of } = require('rxjs');
const { map, filter, switchMap, take, debounceTime, distinctUntilChanged } = require('rxjs/operators');

// Einfaches Observable
const helloObservable = of('Hello', 'Reactive', 'JavaScript');

helloObservable.subscribe({
    next: value => console.log('Next:', value),
    error: error => console.error('Error:', error),
    complete: () => console.log('Complete!')
});

// DOM Events als Observable
const button = document.createElement('button');
button.textContent = 'Click me!';
document.body.appendChild(button);

const clickObservable = fromEvent(button, 'click');

clickObservable
    .pipe(
        map(event => ({ type: 'click', timestamp: Date.now() })),
        debounceTime(300),
        distinctUntilChanged()
    )
    .subscribe(event => {
        console.log('Button clicked:', event);
        // API-Aufruf simulieren
        simulateApiCall();
    });

// API-Aufrufe mit Reactive Pattern
function simulateApiCall() {
    return new Observable(subscriber => {
        console.log('API Call started...');
        
        setTimeout(() => {
            const success = Math.random() > 0.3;
            
            if (success) {
                subscriber.next({ data: 'API Response', status: 'success' });
                subscriber.complete();
            } else {
                subscriber.error(new Error('API Error'));
            }
        }, 1000);
    });
}

// Search mit Reactive Pattern
const searchInput = document.createElement('input');
searchInput.placeholder = 'Search...';
document.body.appendChild(searchInput);

const searchObservable = fromEvent(searchInput, 'input')
    .pipe(
        map(event => event.target.value),
        filter(query => query.length >= 3),
        debounceTime(500),
        distinctUntilChanged(),
        switchMap(query => searchApi(query))
    );

searchObservable.subscribe({
    next: results => console.log('Search results:', results),
    error: error => console.error('Search error:', error)
});

function searchApi(query) {
    return new Observable(subscriber => {
        console.log('Searching for:', query);
        
        setTimeout(() => {
            const results = [`Result 1 for ${query}`, `Result 2 for ${query}`];
            subscriber.next(results);
            subscriber.complete();
        }, 300);
    });
}

// Subject für Multicasting
const subject = new Subject();

// Mehrere Subscriber
const subscriber1 = {
    next: value => console.log('Subscriber 1:', value),
    error: error => console.error('Subscriber 1 Error:', error),
    complete: () => console.log('Subscriber 1 Complete')
};

const subscriber2 = {
    next: value => console.log('Subscriber 2:', value),
    error: error => console.error('Subscriber 2 Error:', error),
    complete: () => console.log('Subscriber 2 Complete')
};

subject.subscribe(subscriber1);
subject.subscribe(subscriber2);

// Daten emittieren
subject.next('Message 1');
subject.next('Message 2');
subject.complete();

// BehaviorSubject für letzten Wert
const behaviorSubject = new BehaviorSubject(0);

behaviorSubject.subscribe(value => console.log('Initial:', value));

behaviorSubject.next(10);
behaviorSubject.next(20);

// Späterer Subscriber erhält letzten Wert
behaviorSubject.subscribe(value => console.log('Late Subscriber:', value));

// WebSocket mit Reactive Pattern
class ReactiveWebSocket {
    constructor(url) {
        this.url = url;
        this.messageSubject = new Subject();
        this.connectionStatus = new BehaviorSubject('disconnected');
    }
    
    connect() {
        this.connectionStatus.next('connecting');
        
        // WebSocket Verbindung simulieren
        this.socket = {
            send: (data) => console.log('Sending:', data),
            close: () => console.log('WebSocket closed')
        };
        
        // Simuliere eingehende Nachrichten
        setInterval(() => {
            const message = { type: 'data', payload: Math.random() };
            this.messageSubject.next(message);
        }, 2000);
        
        this.connectionStatus.next('connected');
        
        return this.connectionStatus.asObservable();
    }
    
    disconnect() {
        if (this.socket) {
            this.socket.close();
            this.connectionStatus.next('disconnected');
        }
    }
    
    sendMessage(message) {
        if (this.socket) {
            this.socket.send(JSON.stringify(message));
        }
    }
    
    getMessages() {
        return this.messageSubject.asObservable();
    }
    
    getConnectionStatus() {
        return this.connectionStatus.asObservable();
    }
}

// WebSocket verwenden
const webSocket = new ReactiveWebSocket('ws://localhost:8080');

webSocket.connect().subscribe(status => {
    console.log('Connection Status:', status);
});

webSocket.getMessages().subscribe(message => {
    console.log('Received Message:', message);
});

// Nachricht senden
setTimeout(() => {
    webSocket.sendMessage({ type: 'ping', data: 'Hello Server' });
}, 5000);

4. Reactive Programming mit Python (RxPY)

import rx
from rx import operators as ops
import time
import threading
from datetime import datetime

# Einfaches Observable
def simple_observable():
    source = rx.of("Python", "Reactive", "Programming")
    
    source.subscribe(
        on_next=lambda value: print(f"Next: {value}"),
        on_error=lambda error: print(f"Error: {error}"),
        on_completed=lambda: print("Completed!")
    )

# Asynchrone Operationen
def async_operations():
    print("\n=== Asynchrone Operationen ===")
    
    # Zeitbasiertes Observable
    rx.interval(1.0).pipe(
        ops.take(5),
        ops.map(lambda i: f"Tick {i + 1}")
    ).subscribe(
        on_next=lambda value: print(f"{value} at {datetime.now().second}s"),
        on_completed=lambda: print("Timer completed!")
    )
    
    # Asynchrone API-Aufrufe simulieren
    def simulate_api_call(item):
        def observer(observer):
            def worker():
                time.sleep(1)  # Simuliere Netzwerkverzögerung
                if item % 3 == 0:
                    observer.on_next(f"Data for {item}")
                    observer.on_completed()
                else:
                    observer.on_error(Exception(f"Error for {item}"))
            
            thread = threading.Thread(target=worker)
            thread.start()
        
        return rx.create(observer)
    
    rx.range(1, 6).pipe(
        ops.map(simulate_api_call),
        ops.merge_all()
    ).subscribe(
        on_next=lambda value: print(f"API Result: {value}"),
        on_error=lambda error: print(f"API Error: {error}")
    )

# Operators Demonstration
def operators_demo():
    print("\n=== Operators Demo ===")
    
    rx.range(1, 11).pipe(
        ops.filter(lambda x: x % 2 == 0),
        ops.map(lambda x: x * x),
        ops.take(3)
    ).subscribe(
        on_next=lambda value: print(f"Result: {value}"),
        on_completed=lambda: print("Operators completed!")
    )
    
    # flatMap für asynchrone Transformation
    def get_user_data(user_id):
        return rx.of(f"UserData for {user_id}").pipe(
            ops.delay(0.5)  # Simuliere Verzögerung
        )
    
    rx.of("user1", "user2", "user3").pipe(
        ops.flat_map(get_user_data)
    ).subscribe(
        on_next=lambda data: print(f"User: {data}")
    )

# Subject für Multicasting
def subjects_demo():
    print("\n=== Subjects Demo ===")
    
    # PublishSubject
    subject = rx.Subject()
    
    # Mehrere Subscriber
    def observer1(value):
        print(f"Observer 1: {value}")
    
    def observer2(value):
        print(f"Observer 2: {value}")
    
    subject.subscribe(observer1)
    subject.subscribe(observer2)
    
    # Daten emittieren
    subject.on_next("Message 1")
    subject.on_next("Message 2")
    subject.on_completed()
    
    # BehaviorSubject
    behavior_subject = rx.BehaviorSubject(0)
    
    behavior_subject.subscribe(
        on_next=lambda value: print(f"Behavior Subject: {value}")
    )
    
    behavior_subject.on_next(10)
    behavior_subject.on_next(20)
    
    # Späterer Subscriber
    behavior_subject.subscribe(
        on_next=lambda value: print(f"Late Subscriber: {value}")
    )

# Hot vs Cold Observable
def hot_cold_demo():
    print("\n=== Hot vs Cold Observable ===")
    
    # Cold Observable (jeder Subscriber bekommt alle Werte)
    cold = rx.of("A", "B", "C")
    
    print("Cold Observable:")
    cold.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
    time.sleep(1)
    cold.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
    
    # Hot Observable (geteilte Werte)
    hot = rx.Subject()
    
    print("\nHot Observable:")
    hot.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
    hot.on_next("X")
    hot.on_next("Y")
    hot.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
    hot.on_next("Z")

if __name__ == "__main__":
    simple_observable()
    async_operations()
    operators_demo()
    subjects_demo()
    hot_cold_demo()
    
    # Programm laufen lassen für asynchrone Operationen
    time.sleep(10)

Reactive Streams Backpressure

Backpressure Strategien

// RxJava Backpressure
Flowable.range(1, 1000)
    .onBackpressureBuffer()       // Buffer (Standard)
    .onBackpressureDrop()         // Überschüssige Elemente verwerfen
    .onBackpressureLatest()       // Nur neuestes Element behalten
    .onBackpressureError()        // Fehler bei Überlastung
    .subscribe(item -> processItem(item));

// Project Reactor Backpressure
Flux.range(1, 1000)
    .onBackpressureBuffer(100)    // Buffer mit Größe
    .onBackpressureDrop()         // Elemente verwerfen
    .onBackpressureLatest()       // Nur letztes Element
    .limitRate(100)               // Rate Limiting
    .subscribe(item -> processItem(item));

Scheduler für Thread-Kontrolle

RxJava Scheduler

Observable.just("data")
    .subscribeOn(Schedulers.io())           // Ausführung auf IO-Thread
    .observeOn(Schedulers.single())          // Beobachtung auf Single-Thread
    .observeOn(Schedulers.computation())     // Berechnungen auf CPU-Thread
    .subscribe(result -> handleResult(result));

Project Reactor Scheduler

Mono.just("data")
    .subscribeOn(Schedulers.boundedElastic())  // IO-Operationen
    .publishOn(Schedulers.parallel())           // Parallelverarbeitung
    .publishOn(Schedulers.single())             // Single Thread
    .subscribe(result -> handleResult(result));

Vorteile und Nachteile

Vorteile von Reactive Programming

  • Asynchronität: Nicht-blockierende Verarbeitung
  • Skalierbarkeit: Bessere Nutzung von Systemressourcen
  • Responsiveness: Schnellere Antwortzeiten
  • Flexibilität: Leichte Komposition von Operationen
  • Error Handling: Zentralisierte Fehlerbehandlung

Nachteile

  • Komplexität: Steile Lernkurve
  • Debugging: Schwierigere Fehlersuche
  • Overhead: Zusätzliche Abstraktionsschicht
  • Ressourcenmanagement: Komplexeres Lifecycle-Management

Häufige Prüfungsfragen

  1. Was ist der Unterschied zwischen Observable und Subject? Observable ist nur Quelle, Subject kann sowohl als Quelle als auch als Empfänger fungieren.

  2. Erklären Sie Backpressure! Mechanismus zum Schutz vor Überlastung bei schnellen Produzenten und langsamen Konsumenten.

  3. Wann verwendet man Reactive Programming? Bei asynchronen Operationen, Echtzeitanwendungen und Systemen mit hohen Anforderungen an Skalierbarkeit.

  4. Was ist der Zweck von Schedulern? Kontrolle über Thread-Ausführung und Parallelverarbeitung in reaktiven Systemen.

Wichtigste Quellen

  1. https://reactivex.io/
  2. https://projectreactor.io/
  3. https://github.com/ReactiveX/RxJava
Zurück zum Blog
Share: