Skip to content
IRC-Coding IRC-Coding
Reactive Programming Observables Subjects RxJava Project Reactor Async Data Streams

Reactive Programming: Async Streams & Observables

Master reactive programming with async data streams. Learn Observables, Subjects, operators, backpressure with RxJava, Project Reactor & JavaScript.

S

schutzgeist

2 min read
Reactive Programming: Async Streams & Observables

Reactive Programming: Asynchronous Data Streams, Observables & Subjects

This article is a comprehensive introduction to Reactive Programming – including asynchronous data streams, Observables, Subjects and practical examples.

In a Nutshell

Reactive Programming revolves around asynchronous data streams. Instead of waiting, you subscribe to data streams and react to events as they arrive.

Compact Technical Description

Reactive Programming is a paradigm for processing asynchronous data streams with non-blocking, event-driven programs.

Core Principles:

  • Asynchronicity: Tasks are executed in the background, main program is not blocked
  • Data Streams (Streams): Everything is considered as a sequence of events
  • Observer Pattern: Data streams are subscribed to, not polled
  • Reactive Streams: Standard for asynchronous stream processing with backpressure

Important Concepts:

  • Observable: Data stream that can emit 0..n values
  • Observer: Recipient of data from the Observable
  • Subscription: Connection between Observable and Observer
  • Operators: Transformations and filtering of data streams
  • Scheduler: Control over thread execution
  • Backpressure: Protection against overload from fast producers

Exam-Relevant Key Points

  • Reactive Programming: Asynchronous, non-blocking processing
  • Observable: Data stream with 0..n values
  • Observer: Recipient for data events
  • Subscription: Connection management between Observable and Observer
  • Operators: map, filter, flatMap for stream transformation
  • Backpressure: Protection against overload, flow control
  • Scheduler: Thread control for asynchronous operations
  • IHK-relevant: Modern architecture paradigm for scalable systems

Core Components

  1. Observable: Source of data events
  2. Observer: Recipient of data
  3. Subscription: Connection and resource management
  4. Operators: Data stream transformations
  5. Subjects: Both Observable and Observer
  6. Scheduler: Thread execution control
  7. Backpressure: Flow control mechanisms
  8. Reactive Streams: Standard specification

Practical Examples

1. Reactive Programming with RxJava

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

public class ReactiveProgrammingDemo {
    
    public static void main(String[] args) throws InterruptedException {
        // Simple Observable
        Observable<String> observable = Observable.just("Hello", "Reactive", "World");
        
        // Subscribe Observer
        observable.subscribe(
            value -> System.out.println("Next: " + value),
            error -> System.err.println("Error: " + error),
            () -> System.out.println("Completed!")
        );
        
        // Asynchronous operations
        asyncOperations();
        
        // Operators demonstration
        operatorsDemo();
        
        // Backpressure with Flowable
        backpressureDemo();
        
        // Subjects for multicasting
        subjectsDemo();
    }
    
    private static void asyncOperations() {
        System.out.println("\n=== Asynchronous Operations ===");
        
        // Observable with Scheduler
        Observable.fromArray("Task 1", "Task 2", "Task 3")
            .subscribeOn(Schedulers.io()) // Execution on IO-Thread
            .observeOn(Schedulers.single()) // Observation on single-thread
            .subscribe(
                task -> System.out.println("Processing: " + task + " on " + Thread.currentThread().getName()),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("All tasks completed")
            );
        
        // Time-based Observable
        Observable.interval(1, TimeUnit.SECONDS)
            .take(5)
            .map(tick -> "Tick " + (tick + 1))
            .subscribe(
                tick -> System.out.println(tick + " on " + Thread.currentThread().getName())
            );
        
        // Simulate asynchronous API calls
        Observable<String> apiCall = Observable.fromCallable(() -> {
            Thread.sleep(1000); // Simulated API call
            return "API-Data";
        }).subscribeOn(Schedulers.io());
        
        apiCall
            .observeOn(Schedulers.single())
            .subscribe(
                data -> System.out.println("API result: " + data),
                error -> System.err.println("API error: " + error)
            );
    }
    
    private static void operatorsDemo() {
        System.out.println("\n=== Operators Demonstration ===");
        
        Observable.range(1, 10)
            .filter(n -> n % 2 == 0) // Filter even numbers
            .map(n -> n * n) // Square
            .take(3) // Only first 3 elements
            .subscribe(
                result -> System.out.println("Result: " + result),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Operators demo completed")
            );
        
        // flatMap for asynchronous transformation
        Observable.just("user1", "user2", "user3")
            .flatMap(userId -> getUserData(userId)
                .subscribeOn(Schedulers.io()) // Each call on own thread
            )
            .subscribe(
                userData -> System.out.println("User Data: " + userData),
                error -> System.err.println("User Error: " + error)
            );
    }
    
    private static void backpressureDemo() {
        System.out.println("\n=== Backpressure Demo ===");
        
        // Fast producer with Flowable
        Flowable.range(1, 1000)
            .onBackpressureBuffer(100) // Buffer with size 100
            .observeOn(Schedulers.io())
            .subscribe(
                item -> {
                    try {
                        Thread.sleep(10); // Slow consumer
                        System.out.println("Processed: " + item);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                },
                error -> System.err.println("Backpressure Error: " + error)
            );
    }
    
    private static void subjectsDemo() {
        System.out.println("\n=== Subjects Demo ===");
        
        // PublishSubject for multicasting
        PublishSubject<String> publishSubject = PublishSubject.create();
        
        // Multiple observers subscribe
        Observer<String> observer1 = new Observer<String>() {
            @Override
            public void onNext(String value) {
                System.out.println("Observer 1: " + value);
            }
            
            @Override
            public void onError(Throwable e) {
                System.err.println("Observer 1 Error: " + e);
            }
            
            @Override
            public void onComplete() {
                System.out.println("Observer 1 Completed");
            }
        };
        
        Observer<String> observer2 = new Observer<String>() {
            @Override
            public void onNext(String value) {
                System.out.println("Observer 2: " + value);
            }
            
            @Override
            public void onError(Throwable e) {
                System.err.println("Observer 2 Error: " + e);
            }
            
            @Override
            public void onComplete() {
                System.out.println("Observer 2 Completed");
            }
        };
        
        publishSubject.subscribe(observer1);
        publishSubject.subscribe(observer2);
        
        // Emit data
        publishSubject.onNext("Message 1");
        publishSubject.onNext("Message 2");
        publishSubject.onComplete();
        
        // BehaviorSubject for last value
        BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.createDefault(0);
        
        behaviorSubject.subscribe(
            value -> System.out.println("Behavior Subject: " + value)
        );
        
        behaviorSubject.onNext(10);
        behaviorSubject.onNext(20);
        
        // Late subscriber receives last value
        behaviorSubject.subscribe(
            value -> System.out.println("Late Subscriber: " + value)
        );
    }
    
    // Simulated API call
    private static Observable<String> getUserData(String userId) {
        return Observable.fromCallable(() -> {
            Thread.sleep(500);
            return "UserData for " + userId;
        });
    }
}

2. Reactive Programming with Project Reactor (Spring)

import reactor.core.publisher.*;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.List;

public class ReactorDemo {
    
    public static void main(String[] args) throws InterruptedException {
        // Mono for 0..1 values
        Mono<String> mono = Mono.just("Hello Reactor");
        
        mono.subscribe(
            value -> System.out.println("Mono: " + value),
            error -> System.err.println("Error: " + error)
        );
        
        // Flux for 0..n values
        Flux<String> flux = Flux.just("A", "B", "C", "D", "E");
        
        flux
            .filter(letter -> !"C".equals(letter))
            .map(String::toUpperCase)
            .subscribe(
                letter -> System.out.println("Flux: " + letter),
                error -> System.err.println("Flux Error: " + error),
                () -> System.out.println("Flux Completed")
            );
        
        // Asynchronous Web-API simulation
        webApiSimulation();
        
        // Error Handling
        errorHandling();
        
        // Backpressure
        backpressureHandling();
    }
    
    private static void webApiSimulation() {
        System.out.println("\n=== Web API Simulation ===");
        
        // Simulate web request
        Mono<String> webResponse = Mono.fromCallable(() -> {
            Thread.sleep(1000); // Network latency
            return "Response Data";
        }).subscribeOn(Schedulers.boundedElastic());
        
        webResponse
            .map(response -> "Processed: " + response)
            .timeout(Duration.ofSeconds(2))
            .onErrorResume(error -> Mono.just("Fallback Response"))
            .subscribe(
                result -> System.out.println("Web Result: " + result),
                error -> System.err.println("Web Error: " + error)
            );
    }
    
    private static void errorHandling() {
        System.out.println("\n=== Error Handling ===");
        
        Flux<Integer> numbers = Flux.range(1, 10)
            .map(n -> {
                if (n == 5) {
                    throw new RuntimeException("Error at " + n);
                }
                return n * n;
            });
        
        numbers
            .onErrorContinue((error, item) -> 
                System.err.println("Error processing " + item + ": " + error))
            .subscribe(
                result -> System.out.println("Result: " + result),
                error -> System.err.println("Final Error: " + error),
                () -> System.out.println("Error Handling Completed")
            );
    }
    
    private static void backpressureHandling() {
        System.out.println("\n=== Backpressure Handling ===");
        
        // Fast Producer
        Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10))
            .take(100);
        
        // Slow Consumer with backpressure
        fastProducer
            .onBackpressureBuffer(50) // Buffer with drop strategy
            .publishOn(Schedulers.boundedElastic())
            .delayElements(Duration.ofMillis(50)) // Slow consumer
            .subscribe(
                item -> System.out.println("Processed: " + item),
                error -> System.err.println("Backpressure Error: " + error)
            );
    }
}

// Reactive Service example
@Service
public class ReactiveUserService {
    
    public Flux<User> getAllUsers() {
        return Flux.fromIterable(getUserList())
            .subscribeOn(Schedulers.parallel());
    }
    
    public Mono<User> getUserById(String id) {
        return Mono.fromCallable(() -> findUserById(id))
            .subscribeOn(Schedulers.boundedElastic())
            .filter(user -> user != null)
            .switchIfEmpty(Mono.error(new UserNotFoundException("User not found: " + id)));
    }
    
    public Flux<User> searchUsers(String query) {
        return getAllUsers()
            .filter(user -> user.getName().toLowerCase().contains(query.toLowerCase()))
            .take(10); // Limit results
    }
    
    // Simulated data source
    private List<User> getUserList() {
        return List.of(
            new User("1", "Alice", "alice@example.com"),
            new User("2", "Bob", "bob@example.com"),
            new User("3", "Charlie", "charlie@example.com")
        );
    }
    
    private User findUserById(String id) {
        return getUserList().stream()
            .filter(user -> user.getId().equals(id))
            .findFirst()
            .orElse(null);
    }
}

class User {
    private String id;
    private String name;
    private String email;
    
    public User(String id, String name, String email) {
        this.id = id;
        this.name = name;
        this.email = email;
    }
    
    // Getter
    public String getId() { return id; }
    public String getName() { return name; }
    public String getEmail() { return email; }
}

3. Reactive Programming with JavaScript (RxJS)

// RxJS Reactive Programming
const { Observable, Subject, BehaviorSubject, fromEvent, interval, of } = require('rxjs');
const { map, filter, switchMap, take, debounceTime, distinctUntilChanged } = require('rxjs/operators');

// Simple Observable
const helloObservable = of('Hello', 'Reactive', 'JavaScript');

helloObservable.subscribe({
    next: value => console.log('Next:', value),
    error: error => console.error('Error:', error),
    complete: () => console.log('Complete!')
});

// DOM Events as Observable
const button = document.createElement('button');
button.textContent = 'Click me!';
document.body.appendChild(button);

const clickObservable = fromEvent(button, 'click');

clickObservable
    .pipe(
        map(event => ({ type: 'click', timestamp: Date.now() })),
        debounceTime(300),
        distinctUntilChanged()
    )
    .subscribe(event => {
        console.log('Button clicked:', event);
        // Simulate API call
        simulateApiCall();
    });

// API calls with reactive pattern
function simulateApiCall() {
    return new Observable(subscriber => {
        console.log('API Call started...');
        
        setTimeout(() => {
            const success = Math.random() > 0.3;
            
            if (success) {
                subscriber.next({ data: 'API Response', status: 'success' });
                subscriber.complete();
            } else {
                subscriber.error(new Error('API Error'));
            }
        }, 1000);
    });
}

// Search with reactive pattern
const searchInput = document.createElement('input');
searchInput.placeholder = 'Search...';
document.body.appendChild(searchInput);

const searchObservable = fromEvent(searchInput, 'input')
    .pipe(
        map(event => event.target.value),
        filter(query => query.length >= 3),
        debounceTime(500),
        distinctUntilChanged(),
        switchMap(query => searchApi(query))
    );

searchObservable.subscribe({
    next: results => console.log('Search results:', results),
    error: error => console.error('Search error:', error)
});

function searchApi(query) {
    return new Observable(subscriber => {
        console.log('Searching for:', query);
        
        setTimeout(() => {
            const results = [`Result 1 for ${query}`, `Result 2 for ${query}`];
            subscriber.next(results);
            subscriber.complete();
        }, 300);
    });
}

// Subject for multicasting
const subject = new Subject();

// Multiple subscribers
const subscriber1 = {
    next: value => console.log('Subscriber 1:', value),
    error: error => console.error('Subscriber 1 Error:', error),
    complete: () => console.log('Subscriber 1 Complete')
};

const subscriber2 = {
    next: value => console.log('Subscriber 2:', value),
    error: error => console.error('Subscriber 2 Error:', error),
    complete: () => console.log('Subscriber 2 Complete')
};

subject.subscribe(subscriber1);
subject.subscribe(subscriber2);

// Emit data
subject.next('Message 1');
subject.next('Message 2');
subject.complete();

// BehaviorSubject for last value
const behaviorSubject = new BehaviorSubject(0);

behaviorSubject.subscribe(value => console.log('Initial:', value));

behaviorSubject.next(10);
behaviorSubject.next(20);

// Late subscriber receives last value
behaviorSubject.subscribe(value => console.log('Late Subscriber:', value));

// WebSocket with reactive pattern
class ReactiveWebSocket {
    constructor(url) {
        this.url = url;
        this.messageSubject = new Subject();
        this.connectionStatus = new BehaviorSubject('disconnected');
    }
    
    connect() {
        this.connectionStatus.next('connecting');
        
        // Simulate WebSocket connection
        this.socket = {
            send: (data) => console.log('Sending:', data),
            close: () => console.log('WebSocket closed')
        };
        
        // Simulate incoming messages
        setInterval(() => {
            const message = { type: 'data', payload: Math.random() };
            this.messageSubject.next(message);
        }, 2000);
        
        this.connectionStatus.next('connected');
        
        return this.connectionStatus.asObservable();
    }
    
    disconnect() {
        if (this.socket) {
            this.socket.close();
            this.connectionStatus.next('disconnected');
        }
    }
    
    sendMessage(message) {
        if (this.socket) {
            this.socket.send(JSON.stringify(message));
        }
    }
    
    getMessages() {
        return this.messageSubject.asObservable();
    }
    
    getConnectionStatus() {
        return this.connectionStatus.asObservable();
    }
}

// Use WebSocket
const webSocket = new ReactiveWebSocket('ws://localhost:8080');

webSocket.connect().subscribe(status => {
    console.log('Connection Status:', status);
});

webSocket.getMessages().subscribe(message => {
    console.log('Received Message:', message);
});

// Send message
setTimeout(() => {
    webSocket.sendMessage({ type: 'ping', data: 'Hello Server' });
}, 5000);

4. Reactive Programming with Python (RxPY)

import rx
from rx import operators as ops
import time
import threading
from datetime import datetime

# Simple Observable
def simple_observable():
    source = rx.of("Python", "Reactive", "Programming")
    
    source.subscribe(
        on_next=lambda value: print(f"Next: {value}"),
        on_error=lambda error: print(f"Error: {error}"),
        on_completed=lambda: print("Completed!")
    )

# Asynchronous Operations
def async_operations():
    print("\n=== Asynchronous Operations ===")
    
    # Time-based Observable
    rx.interval(1.0).pipe(
        ops.take(5),
        ops.map(lambda i: f"Tick {i + 1}")
    ).subscribe(
        on_next=lambda value: print(f"{value} at {datetime.now().second}s"),
        on_completed=lambda: print("Timer completed!")
    )
    
    # Simulate asynchronous API calls
    def simulate_api_call(item):
        def observer(observer):
            def worker():
                time.sleep(1)  # Simulate network delay
                if item % 3 == 0:
                    observer.on_next(f"Data for {item}")
                    observer.on_completed()
                else:
                    observer.on_error(Exception(f"Error for {item}"))
            
            thread = threading.Thread(target=worker)
            thread.start()
        
        return rx.create(observer)
    
    rx.range(1, 6).pipe(
        ops.map(simulate_api_call),
        ops.merge_all()
    ).subscribe(
        on_next=lambda value: print(f"API Result: {value}"),
        on_error=lambda error: print(f"API Error: {error}")
    )

# Operators Demonstration
def operators_demo():
    print("\n=== Operators Demo ===")
    
    rx.range(1, 11).pipe(
        ops.filter(lambda x: x % 2 == 0),
        ops.map(lambda x: x * x),
        ops.take(3)
    ).subscribe(
        on_next=lambda value: print(f"Result: {value}"),
        on_completed=lambda: print("Operators completed!")
    )
    
    # flatMap for asynchronous transformation
    def get_user_data(user_id):
        return rx.of(f"UserData for {user_id}").pipe(
            ops.delay(0.5)  # Simulate delay
        )
    
    rx.of("user1", "user2", "user3").pipe(
        ops.flat_map(get_user_data)
    ).subscribe(
        on_next=lambda data: print(f"User: {data}")
    )

# Subject for Multicasting
def subjects_demo():
    print("\n=== Subjects Demo ===")
    
    # PublishSubject
    subject = rx.Subject()
    
    # Multiple Subscribers
    def observer1(value):
        print(f"Observer 1: {value}")
    
    def observer2(value):
        print(f"Observer 2: {value}")
    
    subject.subscribe(observer1)
    subject.subscribe(observer2)
    
    # Emit data
    subject.on_next("Message 1")
    subject.on_next("Message 2")
    subject.on_completed()
    
    # BehaviorSubject
    behavior_subject = rx.BehaviorSubject(0)
    
    behavior_subject.subscribe(
        on_next=lambda value: print(f"Behavior Subject: {value}")
    )
    
    behavior_subject.on_next(10)
    behavior_subject.on_next(20)
    
    # Late Subscriber
    behavior_subject.subscribe(
        on_next=lambda value: print(f"Late Subscriber: {value}")
    )

# Hot vs Cold Observable
def hot_cold_demo():
    print("\n=== Hot vs Cold Observable ===")
    
    # Cold Observable (each subscriber gets all values)
    cold = rx.of("A", "B", "C")
    
    print("Cold Observable:")
    cold.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
    time.sleep(1)
    cold.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
    
    # Hot Observable (shared values)
    hot = rx.Subject()
    
    print("\nHot Observable:")
    hot.subscribe(on_next=lambda x: print(f"Subscriber 1: {x}"))
    hot.on_next("X")
    hot.on_next("Y")
    hot.subscribe(on_next=lambda x: print(f"Subscriber 2: {x}"))
    hot.on_next("Z")

if __name__ == "__main__":
    simple_observable()
    async_operations()
    operators_demo()
    subjects_demo()
    hot_cold_demo()
    
    # Keep program running for asynchronous operations
    time.sleep(10)

Reactive Streams Backpressure

Backpressure Strategies

// RxJava Backpressure
Flowable.range(1, 1000)
    .onBackpressureBuffer()       // Buffer (Standard)
    .onBackpressureDrop()         // Drop excess elements
    .onBackpressureLatest()       // Keep only latest element
    .onBackpressureError()        // Error on overload
    .subscribe(item -> processItem(item));

// Project Reactor Backpressure
Flux.range(1, 1000)
    .onBackpressureBuffer(100)    // Buffer with size
    .onBackpressureDrop()         // Drop elements
    .onBackpressureLatest()       // Only last element
    .limitRate(100)               // Rate Limiting
    .subscribe(item -> processItem(item));

Scheduler for Thread Control

RxJava Scheduler

Observable.just("data")
    .subscribeOn(Schedulers.io())           // Execution on IO thread
    .observeOn(Schedulers.single())          // Observation on single thread
    .observeOn(Schedulers.computation())     // Computations on CPU thread
    .subscribe(result -> handleResult(result));

Project Reactor Scheduler

Mono.just("data")
    .subscribeOn(Schedulers.boundedElastic())  // IO operations
    .publishOn(Schedulers.parallel())           // Parallel processing
    .publishOn(Schedulers.single())             // Single thread
    .subscribe(result -> handleResult(result));

Advantages and Disadvantages

Advantages of Reactive Programming

  • Asynchronicity: Non-blocking processing
  • Scalability: Better utilization of system resources
  • Responsiveness: Faster response times
  • Flexibility: Easy composition of operations
  • Error Handling: Centralized error handling

Disadvantages

  • Complexity: Steep learning curve
  • Debugging: More difficult error tracing
  • Overhead: Additional abstraction layer
  • Resource Management: More complex lifecycle management

Common Exam Questions

  1. What is the difference between Observable and Subject? Observable is only a source, Subject can function both as a source and as a receiver.

  2. Explain Backpressure! Mechanism to protect against overload when fast producers and slow consumers interact.

  3. When do you use Reactive Programming? For asynchronous operations, real-time applications, and systems with high scalability requirements.

  4. What is the purpose of Schedulers? Control over thread execution and parallel processing in reactive systems.

Most Important Sources

  1. https://reactivex.io/
  2. https://projectreactor.io/
  3. https://github.com/ReactiveX/RxJava
Back to Blog
Share: