Microservices Architecture Fundamentals: Service Mesh, API Gateway, Circuit Breaker & Distributed Tracing
This article is a comprehensive introduction to Microservices Architecture Fundamentals – including Service Mesh, API Gateway, Circuit Breaker, and Distributed Tracing with practical examples.
In a Nutshell
Microservices architecture is an approach in which an application is developed as a collection of small, independent services, each fulfilling a specific business function.
Compact Technical Description
Microservices Architecture is an architectural style that divides an application into loosely coupled, autonomously deployable services that communicate via network protocols.
Core Components:
Service Mesh
- Sidecar Pattern: Proxy next to each service
- Traffic Management: Routing, Load Balancing
- Security: mTLS, Access Control
- Observability: Metrics, Logging, Tracing
- Implementations: Istio, Linkerd, Consul Connect
API Gateway
- Single Entry Point: Centralized API access
- Request Routing: Forward requests to services
- Authentication: Authentication and authorization
- Rate Limiting: Enforce request limits
- Protocol Translation: Protocol conversion
Resilience Patterns
- Circuit Breaker: Prevent failure cascades
- Retry Pattern: Retry attempts on failures
- Timeout Pattern: Set time limits for requests
- Bulkhead Pattern: Resource isolation
- Fallback Pattern: Fallback solutions on outages
Distributed Tracing
- Request Tracing: Track requests through services
- Span Trees: Hierarchical trace structure
- Context Propagation: Propagate context
- Sampling: Data collection strategies
- Visualization: Visualize trace data
Exam-Relevant Key Points
- Microservices: Small, autonomous services with their own responsibilities
- Service Mesh: Infrastructure layer for service communication
- API Gateway: Central entry point for client requests
- Circuit Breaker: Protection mechanism against failure cascades
- Distributed Tracing: Track requests across service boundaries
- Service Discovery: Automatic detection of service instances
- Load Balancing: Distribute requests across multiple instances
- Resilience: Resistance to failures
- Chamber of Commerce Relevant: Modern distributed system architectures
Core Components
- Service Discovery: Dynamic service detection and registration
- API Gateway: Centralized API management and routing
- Service Mesh: Communication control between services
- Circuit Breaker: Error handling and isolation
- Distributed Tracing: Request tracking and performance analysis
- Configuration Management: Centralized configuration management
- Monitoring: Monitoring and alerting
- Deployment: Continuous delivery and orchestration
Practical Examples
1. Microservice with Go and Service Discovery
// main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
// Internal packages
"github.com/company/microservices/internal/config"
"github.com/company/microservices/internal/database"
"github.com/company/microservices/internal/models"
"github.com/company/microservices/internal/registry"
"github.com/company/microservices/internal/circuitbreaker"
"github.com/company/microservices/internal/tracing"
)
// Service configuration
type ServiceConfig struct {
Name string `json:"name"`
Version string `json:"version"`
Port int `json:"port"`
DatabaseURL string `json:"database_url"`
RegistryURL string `json:"registry_url"`
JaegerURL string `json:"jaeger_url"`
}
// Metrics
var (
httpRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "http_requests_total",
Help: "Total number of HTTP requests",
},
[]string{"method", "endpoint", "status"},
)
httpRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "http_request_duration_seconds",
Help: "HTTP request duration in seconds",
},
[]string{"method", "endpoint"},
)
activeConnections = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "active_connections",
Help: "Number of active connections",
},
)
)
func init() {
prometheus.MustRegister(httpRequestsTotal)
prometheus.MustRegister(httpRequestDuration)
prometheus.MustRegister(activeConnections)
}
// UserService handles user-related operations
type UserService struct {
config *ServiceConfig
db *database.Database
registry *registry.ServiceRegistry
tracer trace.Tracer
breaker *circuitbreaker.CircuitBreaker
}
// NewUserService creates a new user service
func NewUserService(cfg *ServiceConfig) (*UserService, error) {
// Initialize database
db, err := database.NewDatabase(cfg.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("failed to initialize database: %w", err)
}
// Initialize service registry
reg, err := registry.NewServiceRegistry(cfg.RegistryURL)
if err != nil {
return nil, fmt.Errorf("failed to initialize service registry: %w", err)
}
// Initialize circuit breaker
breakerConfig := circuitbreaker.Config{
MaxRequests: 100,
Interval: 60 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: circuitbreaker.DefaultReadyToTrip,
OnStateChange: circuitbreaker.DefaultOnStateChange,
FallbackFunc: nil,
}
breaker := circuitbreaker.NewCircuitBreaker(breakerConfig)
// Initialize tracer
tracer := otel.Tracer("user-service")
return &UserService{
config: cfg,
db: db,
registry: reg,
tracer: tracer,
breaker: breaker,
}, nil
}
// RegisterService registers the service with the service registry
func (s *UserService) RegisterService(ctx context.Context) error {
serviceInfo := registry.ServiceInfo{
Name: s.config.Name,
Version: s.config.Version,
Address: fmt.Sprintf("localhost:%d", s.config.Port),
Tags: []string{"user", "service", "v1"},
Metadata: map[string]string{
"port": strconv.Itoa(s.config.Port),
"version": s.config.Version,
"health_check": "/health",
},
}
return s.registry.Register(ctx, serviceInfo)
}
// DeregisterService removes the service from the registry
func (s *UserService) DeregisterService(ctx context.Context) error {
return s.registry.Deregister(ctx, s.config.Name)
}
// HealthCheck returns the health status of the service
func (s *UserService) HealthCheck(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "health-check")
defer span.End()
// Check database connectivity
if err := s.db.Ping(ctx); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"status": "unhealthy",
"error": err.Error(),
})
return
}
// Check circuit breaker state
breakerState := s.breaker.State()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "healthy",
"service": s.config.Name,
"version": s.config.Version,
"timestamp": time.Now().Unix(),
"breaker_state": breakerState.String(),
})
}
// CreateUser creates a new user
func (s *UserService) CreateUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "create-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("POST", "/users").Observe(time.Since(start).Seconds())
}()
var user models.User
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
// Validate user
if err := user.Validate(); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
// Create user with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.CreateUser(ctx, &user)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
createdUser := result.(*models.User)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
json.NewEncoder(w).Encode(createdUser)
httpRequestsTotal.WithLabelValues("POST", "/users", "201").Inc()
}
// GetUser retrieves a user by ID
func (s *UserService) GetUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "get-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("GET", "/users/{id}").Observe(time.Since(start).Seconds())
}()
vars := mux.Vars(r)
userID := vars["id"]
if userID == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "User ID is required"})
return
}
// Get user with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.GetUser(ctx, userID)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
if err.Error() == "user not found" {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "User not found"})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
user := result.(*models.User)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
httpRequestsTotal.WithLabelValues("GET", "/users/{id}", "200").Inc()
}
// UpdateUser updates an existing user
func (s *UserService) UpdateUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "update-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("PUT", "/users/{id}").Observe(time.Since(start).Seconds())
}()
vars := mux.Vars(r)
userID := vars["id"]
if userID == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "User ID is required"})
return
}
var user models.User
if err := json.NewDecoder(r.Body).Decode(&user); err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
user.ID = userID
// Update user with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.UpdateUser(ctx, &user)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
if err.Error() == "user not found" {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "User not found"})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
updatedUser := result.(*models.User)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(updatedUser)
httpRequestsTotal.WithLabelValues("PUT", "/users/{id}", "200").Inc()
}
// DeleteUser deletes a user by ID
func (s *UserService) DeleteUser(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "delete-user")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("DELETE", "/users/{id}").Observe(time.Since(start).Seconds())
}()
vars := mux.Vars(r)
userID := vars["id"]
if userID == "" {
w.WriteHeader(http.StatusBadRequest)
json.NewEncoder(w).Encode(map[string]string{"error": "User ID is required"})
return
}
// Delete user with circuit breaker
_, err := s.breaker.Execute(func() (interface{}, error) {
return nil, s.db.DeleteUser(ctx, userID)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
if err.Error() == "user not found" {
w.WriteHeader(http.StatusNotFound)
json.NewEncoder(w).Encode(map[string]string{"error": "User not found"})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
w.WriteHeader(http.StatusNoContent)
httpRequestsTotal.WithLabelValues("DELETE", "/users/{id}", "204").Inc()
}
// ListUsers retrieves a list of users with pagination
func (s *UserService) ListUsers(w http.ResponseWriter, r *http.Request) {
ctx, span := s.tracer.Start(r.Context(), "list-users")
defer span.End()
start := time.Now()
defer func() {
httpRequestDuration.WithLabelValues("GET", "/users").Observe(time.Since(start).Seconds())
}()
// Parse query parameters
page := 1
limit := 10
if p := r.URL.Query().Get("page"); p != "" {
if parsed, err := strconv.Atoi(p); err == nil && parsed > 0 {
page = parsed
}
}
if l := r.URL.Query().Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 && parsed <= 100 {
limit = parsed
}
}
// Get users with circuit breaker
result, err := s.breaker.Execute(func() (interface{}, error) {
return s.db.ListUsers(ctx, page, limit)
})
if err != nil {
span.SetAttributes(semconv.ExceptionMessageKey.String(err.Error()))
if s.breaker.State() == circuitbreaker.StateOpen {
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "Service temporarily unavailable",
"code": "CIRCUIT_BREAKER_OPEN",
})
return
}
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
users := result.([]*models.User)
response := map[string]interface{}{
"users": users,
"pagination": map[string]interface{}{
"page": page,
"limit": limit,
"total": len(users),
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
httpRequestsTotal.WithLabelValues("GET", "/users", "200").Inc()
}
// GetServiceInfo returns service information
func (s *UserService) GetServiceInfo(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"name": s.config.Name,
"version": s.config.Version,
"port": s.config.Port,
"timestamp": time.Now().Unix(),
"breaker_state": s.breaker.State().String(),
})
}
// MetricsHandler exposes Prometheus metrics
func (s *UserService) MetricsHandler(w http.ResponseWriter, r *http.Request) {
promhttp.Handler().ServeHTTP(w, r)
}
// SetupRoutes configures the HTTP routes
func (s *UserService) SetupRoutes() *mux.Router {
r := mux.NewRouter()
// Add OpenTelemetry middleware
r.Use(otelmux.Middleware("user-service"))
// Add middleware for metrics and logging
r.Use(s.metricsMiddleware)
r.Use(s.loggingMiddleware)
// API routes
api := r.PathPrefix("/api/v1").Subrouter()
api.HandleFunc("/users", s.ListUsers).Methods("GET")
api.HandleFunc("/users", s.CreateUser).Methods("POST")
api.HandleFunc("/users/{id}", s.GetUser).Methods("GET")
api.HandleFunc("/users/{id}", s.UpdateUser).Methods("PUT")
api.HandleFunc("/users/{id}", s.DeleteUser).Methods("DELETE")
// Health and info endpoints
r.HandleFunc("/health", s.HealthCheck).Methods("GET")
r.HandleFunc("/info", s.GetServiceInfo).Methods("GET")
// Metrics endpoint
r.Handle("/metrics", promhttp.Handler()).Methods("GET")
return r
}
// metricsMiddleware tracks HTTP request metrics
func (s *UserService) metricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
activeConnections.Inc()
defer activeConnections.Dec()
// Wrap response writer to capture status code
wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(wrapped, r)
// Record metrics
httpRequestsTotal.WithLabelValues(r.Method, r.URL.Path, strconv.Itoa(wrapped.statusCode)).Inc()
})
}
// loggingMiddleware logs HTTP requests
func (s *UserService) loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// Wrap response writer
wrapped := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(wrapped, r)
duration := time.Since(start)
log.Printf(
"%s %s %d %v %s",
r.Method,
r.URL.Path,
wrapped.statusCode,
duration,
r.UserAgent(),
)
})
}
// responseWriter wraps http.ResponseWriter to capture status code
type responseWriter struct {
http.ResponseWriter
statusCode int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(code)
}
// initTracer initializes OpenTelemetry tracing
func initTracer(serviceName, jaegerURL string) (*sdktrace.TracerProvider, error) {
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerURL)))
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String("1.0.0"),
)),
)
otel.SetTracerProvider(tp)
return tp, nil
}
func main() {
// Load configuration
cfg, err := config.Load[ServiceConfig]("config.json")
if err != nil {
log.Fatalf("Failed to load configuration: %v", err)
}
// Initialize tracing
tp, err := initTracer(cfg.Name, cfg.JaegerURL)
if err != nil {
log.Fatalf("Failed to initialize tracer: %v", err)
}
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}()
// Create user service
userService, err := NewUserService(cfg)
if err != nil {
log.Fatalf("Failed to create user service: %v", err)
}
// Register service
ctx := context.Background()
if err := userService.RegisterService(ctx); err != nil {
log.Fatalf("Failed to register service: %v", err)
}
// Setup graceful shutdown
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down service...")
// Deregister service
if err := userService.DeregisterService(ctx); err != nil {
log.Printf("Failed to deregister service: %v", err)
}
// Shutdown tracer
if err := tp.Shutdown(ctx); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
os.Exit(0)
}()
// Setup routes and start server
router := userService.SetupRoutes()
log.Printf("Starting %s on port %d", cfg.Name, cfg.Port)
log.Printf("Health check: http://localhost:%d/health", cfg.Port)
log.Printf("Metrics: http://localhost:%d/metrics", cfg.Port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), router); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}
2. API Gateway with Java Spring Cloud
// GatewayApplication.java
package com.company.gateway;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.reactive.CorsWebFilter;
import org.springframework.web.cors.reactive.UrlBasedCorsConfigurationSource;
import java.util.Arrays;
@SpringBootApplication
@EnableWebFluxSecurity
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
// User Service Routes
.route("user-service", r -> r.path("/api/v1/users/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-User-Service", "Gateway")
.addResponseHeader("X-Response-User-Service", "Gateway")
.circuitBreaker(config -> config
.setName("user-service")
.setFallbackUri("forward:/fallback/user-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://user-service"))
// Order Service Routes
.route("order-service", r -> r.path("/api/v1/orders/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Order-Service", "Gateway")
.addResponseHeader("X-Response-Order-Service", "Gateway")
.circuitBreaker(config -> config
.setName("order-service")
.setFallbackUri("forward:/fallback/order-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://order-service"))
// Product Service Routes
.route("product-service", r -> r.path("/api/v1/products/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Product-Service", "Gateway")
.addResponseHeader("X-Response-Product-Service", "Gateway")
.circuitBreaker(config -> config
.setName("product-service")
.setFallbackUri("forward:/fallback/product-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://product-service"))
// Notification Service Routes
.route("notification-service", r -> r.path("/api/v1/notifications/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Notification-Service", "Gateway")
.addResponseHeader("X-Response-Notification-Service", "Gateway")
.circuitBreaker(config -> config
.setName("notification-service")
.setFallbackUri("forward:/fallback/notification-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
.requestRateLimiter(config -> config
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver()))
)
.uri("lb://notification-service"))
// Authentication Service Routes
.route("auth-service", r -> r.path("/api/v1/auth/**")
.filters(f -> f
.stripPrefix(2)
.addRequestHeader("X-Request-Auth-Service", "Gateway")
.addResponseHeader("X-Response-Auth-Service", "Gateway")
.circuitBreaker(config -> config
.setName("auth-service")
.setFallbackUri("forward:/fallback/auth-service"))
.retry(retryConfig -> retryConfig
.setRetries(3)
.setBackoff(Duration.ofMillis(100), Duration.ofMillis(500), 2))
)
.uri("lb://auth-service"))
// Health Check Routes (no rate limiting)
.route("health-check", r -> r.path("/health/**")
.filters(f -> f.stripPrefix(1))
.uri("lb://user-service"))
.build();
}
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/api/v1/auth/**").permitAll()
.pathMatchers("/health/**").permitAll()
.pathMatchers("/actuator/**").permitAll()
.anyExchange().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.jwtDecoder(jwtDecoder()))
)
.build();
}
@Bean
public CorsWebFilter corsWebFilter() {
CorsConfiguration corsConfig = new CorsConfiguration();
corsConfig.setAllowCredentials(true);
corsConfig.addAllowedOriginPattern("*");
corsConfig.addAllowedHeader("*");
corsConfig.addAllowedMethod("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", corsConfig);
return new CorsWebFilter(source);
}
@Bean
public RedisRateLimiter redisRateLimiter() {
return new RedisRateLimiter(10, 20, 1); // replenishRate, burstCapacity, requestedTokens
}
@Bean
public KeyResolver userKeyResolver() {
return exchange -> exchange.getRequest()
.getHeaders()
.getFirst("X-User-ID")
.map(userId -> (String) userId)
.defaultIfEmpty("anonymous");
}
@Bean
public JwtDecoder jwtDecoder() {
// Configure JWT decoder based on your authentication service
return NimbusJwtDecoder.withJwkSetUri("http://auth-service/.well-known/jwks.json").build();
}
}
// GatewayController.java
package com.company.gateway.controller;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/fallback")
public class GatewayController {
@GetMapping("/user-service")
public Mono<ResponseEntity<Map<String, Object>>> userServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "user-service");
response.put("status", "fallback");
response.put("message", "User service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/order-service")
public Mono<ResponseEntity<Map<String, Object>>> orderServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "order-service");
response.put("status", "fallback");
response.put("message", "Order service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/product-service")
public Mono<ResponseEntity<Map<String, Object>>> productServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "product-service");
response.put("status", "fallback");
response.put("message", "Product service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/notification-service")
public Mono<ResponseEntity<Map<String, Object>>> notificationServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "notification-service");
response.put("status", "fallback");
response.put("message", "Notification service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
@GetMapping("/auth-service")
public Mono<ResponseEntity<Map<String, Object>>> authServiceFallback() {
Map<String, Object> response = new HashMap<>();
response.put("service", "auth-service");
response.put("status", "fallback");
response.put("message", "Authentication service is temporarily unavailable");
response.put("timestamp", System.currentTimeMillis());
return Mono.just(ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(response));
}
}
// GatewayFilter.java
package com.company.gateway.filter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.UUID;
@Component
public class GatewayFilter implements GlobalFilter, Ordered {
private static final Logger logger = LoggerFactory.getLogger(GatewayFilter.class);
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// Generate unique request ID
String requestId = UUID.randomUUID().toString();
// Add request ID to headers
exchange.getRequest().mutate()
.header("X-Request-ID", requestId)
.header("X-Gateway-Timestamp", String.valueOf(System.currentTimeMillis()));
// Log request
logger.info("Request: {} {} - ID: {} - User: {}",
exchange.getRequest().getMethod(),
exchange.getRequest().getURI(),
requestId,
exchange.getRequest().getHeaders().getFirst("X-User-ID"));
// Process request and log response
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
// Log response
logger.info("Response: {} {} - ID: {} - Status: {}",
exchange.getRequest().getMethod(),
exchange.getRequest().getURI(),
requestId,
exchange.getResponse().getStatusCode());
// Add response headers
exchange.getResponse().getHeaders().add("X-Request-ID", requestId);
exchange.getResponse().getHeaders().add("X-Gateway-Response-Time", String.valueOf(System.currentTimeMillis()));
}));
}
@Override
public int getOrder() {
return -1; // High precedence
}
}
// application.yml
server:
port: 8080
spring:
application:
name: api-gateway
cloud:
gateway:
discovery:
locator:
enabled: true
lower-case-service-id: true
routes:
# Additional configuration can be added here
redis:
host: localhost
port: 6379
timeout: 2000ms
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
security:
oauth2:
resourceserver:
jwt:
issuer-uri: http://auth-service
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,gateway
endpoint:
gateway:
enabled: true
logging:
level:
org.springframework.cloud.gateway: DEBUG
org.springframework.web.reactive: DEBUG
com.company.gateway: INFO
# Resilience4j configuration
resilience4j:
circuitbreaker:
instances:
user-service:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 5s
failureRateThreshold: 50
eventConsumerBufferSize: 10
recordExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
order-service:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 5s
failureRateThreshold: 50
eventConsumerBufferSize: 10
recordExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
product-service:
registerHealthIndicator: true
slidingWindowSize: 10
minimumNumberOfCalls: 5
permittedNumberOfCallsInHalfOpenState: 3
automaticTransitionFromOpenToHalfOpenEnabled: true
waitDurationInOpenState: 5s
failureRateThreshold: 50
eventConsumerBufferSize: 10
recordExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
retry:
instances:
user-service:
maxAttempts: 3
waitDuration: 1s
retryExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
order-service:
maxAttempts: 3
waitDuration: 1s
retryExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
product-service:
maxAttempts: 3
waitDuration: 1s
retryExceptions:
- org.springframework.web.reactive.function.client.WebClientResponseException
ratelimiter:
instances:
user-service:
limitForPeriod: 10
limitRefreshPeriod: 1s
timeoutDuration: 0
registerHealthIndicator: true
order-service:
limitForPeriod: 20
limitRefreshPeriod: 1s
timeoutDuration: 0
registerHealthIndicator: true
product-service:
limitForPeriod: 15
limitRefreshPeriod: 1s
timeoutDuration: 0
registerHealthIndicator: true
3. Circuit Breaker Implementation with Python
# circuit_breaker.py
import time
import logging
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps
from dataclasses import dataclass
from threading import Lock
import asyncio
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
max_requests: int = 100
interval: float = 60.0 # seconds
timeout: float = 30.0 # seconds
expected_exception: type = Exception
reset_timeout: float = 60.0 # seconds
class CircuitBreaker:
def __init__(self, config: CircuitBreakerConfig):
self.config = config
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.success_count = 0
self.request_count = 0
self.lock = Lock()
self.logger = logging.getLogger(__name__)
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
return self.call(func, *args, **kwargs)
return wrapper
def call(self, func: Callable, *args, **kwargs) -> Any:
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.logger.info("Circuit breaker transitioning to HALF_OPEN")
else:
raise CircuitBreakerOpenException("Circuit breaker is OPEN")
try:
self.request_count += 1
result = func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure()
raise
except Exception as e:
self._on_failure()
raise
async def call_async(self, func: Callable, *args, **kwargs) -> Any:
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.logger.info("Circuit breaker transitioning to HALF_OPEN")
else:
raise CircuitBreakerOpenException("Circuit breaker is OPEN")
try:
self.request_count += 1
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
self._on_success()
return result
except self.config.expected_exception as e:
self._on_failure()
raise
except Exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
return (time.time() - self.last_failure_time) >= self.config.reset_timeout
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.max_requests:
self._reset()
else:
self.failure_count = 0
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.logger.warning("Circuit breaker transitioning to OPEN")
elif self.failure_count >= self.config.max_requests:
self.state = CircuitState.OPEN
self.logger.warning("Circuit breaker transitioning to OPEN")
def _reset(self):
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.request_count = 0
self.logger.info("Circuit breaker reset to CLOSED")
def get_state(self) -> CircuitState:
return self.state
def get_stats(self) -> dict:
return {
"state": self.state.value,
"failure_count": self.failure_count,
"success_count": self.success_count,
"request_count": self.request_count,
"last_failure_time": self.last_failure_time
}
class CircuitBreakerOpenException(Exception):
pass
# Usage examples
def example_service():
"""Example service that can fail"""
import random
if random.random() < 0.3: # 30% chance of failure
raise ValueError("Service temporarily unavailable")
return "Service response"
# Create circuit breaker
config = CircuitBreakerConfig(
max_requests=5,
interval=60.0,
timeout=30.0,
expected_exception=ValueError,
reset_timeout=30.0
)
circuit_breaker = CircuitBreaker(config)
# Apply circuit breaker to service
protected_service = circuit_breaker(example_service)
# Test the circuit breaker
def test_circuit_breaker():
for i in range(20):
try:
result = protected_service()
print(f"Request {i+1}: SUCCESS - {result}")
except CircuitBreakerOpenException as e:
print(f"Request {i+1}: CIRCUIT BREAKER OPEN - {e}")
except ValueError as e:
print(f"Request {i+1}: SERVICE FAILURE - {e}")
except Exception as e:
print(f"Request {i+1}: UNEXPECTED ERROR - {e}")
time.sleep(0.1)
# Async circuit breaker decorator
def circuit_breaker_async(config: CircuitBreakerConfig):
def decorator(func):
cb = CircuitBreaker(config)
@wraps(func)
async def wrapper(*args, **kwargs):
return await cb.call_async(func, *args, **kwargs)
return wrapper
return decorator
# Usage with async functions
@circuit_breaker_async(config)
async def async_service():
import random
await asyncio.sleep(0.1) # Simulate async operation
if random.random() < 0.3:
raise ValueError("Async service temporarily unavailable")
return "Async service response"
# Test async circuit breaker
async def test_async_circuit_breaker():
for i in range(20):
try:
result = await async_service()
print(f"Async Request {i+1}: SUCCESS - {result}")
except CircuitBreakerOpenException as e:
print(f"Async Request {i+1}: CIRCUIT BREAKER OPEN - {e}")
except ValueError as e:
print(f"Async Request {i+1}: SERVICE FAILURE - {e}")
except Exception as e:
print(f"Async Request {i+1}: UNEXPECTED ERROR - {e}")
await asyncio.sleep(0.1)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
print("Testing synchronous circuit breaker:")
test_circuit_breaker()
print("\nTesting asynchronous circuit breaker:")
asyncio.run(test_async_circuit_breaker())
Microservices Architecture
Service Communication Patterns
graph TD
A[Client] --> B[API Gateway]
B --> C[Service Discovery]
B --> D[User Service]
B --> E[Order Service]
B --> F[Product Service]
C --> G[Service Registry]
D --> H[Database]
E --> I[Database]
F --> J[Database]
D --> K[Message Queue]
E --> K
F --> K
K --> L[Notification Service]
M[Service Mesh] --> D
M --> E
M --> F
N[Monitoring] --> M
O[Logging] --> M
P[Tracing] --> M
Service Mesh Implementations
Istio vs. Linkerd vs. Consul Connect
| Feature | Istio | Linkerd | Consul Connect |
|---|---|---|---|
| Complexity | High | Low | Medium |
| Performance | Good | Very Good | Good |
| Integration | Kubernetes | Kubernetes | Multi-Cloud |
| Security | mTLS, RBAC | mTLS | mTLS, Intentions |
| Observability | Jaeger, Prometheus | Grafana, Prometheus | Prometheus |
Service Mesh Features
- Traffic Management: Routing, Load Balancing, Traffic Splitting
- Security: mTLS, Access Control, Certificate Management
- Observability: Metrics, Logging, Tracing
- Reliability: Retries, Timeouts, Circuit Breaking
- Policy Enforcement: Access Policies, Rate Limiting
API Gateway Patterns
Gateway Routing Strategies
| Pattern | Description | Application |
|---|---|---|
| Routing | Forward requests to services | Load Balancing |
| Composition | Combine multiple services | API Composition |
| Protocol Translation | Protocol conversion | Legacy Integration |
| Aggregation | Data from multiple services | Data Aggregation |
Gateway Features
- Authentication: JWT, OAuth2, API Keys
- Authorization: RBAC, Policy Enforcement
- Rate Limiting: User-based, Service-based
- Caching: Response Caching, Edge Caching
- Monitoring: Metrics, Logging, Tracing
Resilience Patterns
Circuit Breaker States
stateDiagram-v2
[*] --> CLOSED
CLOSED --> OPEN: Failure Threshold Reached
OPEN --> HALF_OPEN: Reset Timeout
HALF_OPEN --> CLOSED: Success Threshold Reached
HALF_OPEN --> OPEN: Failure Occurs
CLOSED --> CLOSED: Success
OPEN --> OPEN: Reset Timeout Not Reached
Resilience Patterns Overview
| Pattern | Purpose | Implementation |
|---|---|---|
| Circuit Breaker | Prevent failure cascades | State Machine |
| Retry | Overcome temporary errors | Exponential Backoff |
| Timeout | Avoid infinite loops | Time Limits |
| Bulkhead | Resource isolation | Thread Pools |
| Fallback | Provide alternative solutions | Alternative Services |
Distributed Tracing
Trace Context Propagation
# Trace context example
class TraceContext:
def __init__(self, trace_id: str, span_id: str, parent_span_id: str = None):
self.trace_id = trace_id
self.span_id = span_id
self.parent_span_id = parent_span_id
self.baggage = {}
def to_headers(self) -> dict:
return {
"X-Trace-ID": self.trace_id,
"X-Span-ID": self.span_id,
"X-Parent-Span-ID": self.parent_span_id or "",
"X-Baggage": json.dumps(self.baggage)
}
@classmethod
def from_headers(cls, headers: dict) -> 'TraceContext':
return cls(
trace_id=headers.get("X-Trace-ID", str(uuid.uuid4())),
span_id=headers.get("X-Span-ID", str(uuid.uuid4())),
parent_span_id=headers.get("X-Parent-Span-ID"),
baggage=json.loads(headers.get("X-Baggage", "{}"))
)
Tracing Implementations
| Tool | Integration | Features | Application |
|---|---|---|---|
| Jaeger | OpenTelemetry | Distributed Tracing | Microservices |
| Zipkin | Spring Cloud | Request Tracing | Legacy Systems |
| AWS X-Ray | AWS Services | Cloud Tracing | AWS Environment |
| Google Cloud Trace | GCP Services | Performance Analysis | GCP Environment |
Advantages and Disadvantages
Advantages of Microservices
- Scalability: Scale individual services independently
- Flexibility: Different technologies per service
- Resilience: Failure of one service does not affect the entire system
- Team Autonomy: Small teams can work independently
- Faster Deployment: Small, frequent deployments
Disadvantages
- Complexity: High infrastructural complexity
- Communication Overhead: Network latencies
- Data Consistency: Distributed transactions
- Monitoring: Complex monitoring required
- Testing: End-to-end tests are complex
Common Exam Questions
-
What is the difference between Service Mesh and API Gateway? Service Mesh manages communication between services, while API Gateway manages incoming client requests and routes them to services.
-
Explain the Circuit Breaker Pattern! Circuit Breaker prevents failure cascades by blocking requests to error-prone services when a certain failure threshold is reached.
-
When do you use Distributed Tracing? Distributed Tracing is used to track requests through multiple microservices, identify performance bottlenecks, and debug errors.
-
What are the main advantages of Microservices? Independent scalability, technological flexibility, improved resilience, and faster deployment cycles.
Key Resources
- https://istio.io/
- https://linkerd.io/
- https://spring.io/projects/spring-cloud-gateway
- https://opentelemetry.io/