🔄 Saga Pattern
Sequence of local transactions with compensation. No distributed locks, better scalability.
Problem: How to maintain consistency across multiple services?
Traditional solution: 2PC (Two-Phase Commit) - but it’s:
Solution: Saga Pattern - Sequence of local transactions with compensation!
Saga = Sequence of local transactions. If any step fails, compensate previous steps.
Central coordinator orchestrates all steps.
Characteristics:
from enum import Enumfrom typing import List, Callable, Dict, Anyfrom dataclasses import dataclass
class SagaStatus(Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPENSATING = "compensating" COMPLETED = "completed" FAILED = "failed"
@dataclassclass SagaStep: """Saga step definition""" name: str execute: Callable # Step execution function compensate: Callable # Compensation function service: str
class SagaOrchestrator: """Saga orchestrator"""
def __init__(self): self.steps: List[SagaStep] = [] self.executed_steps: List[SagaStep] = [] self.status = SagaStatus.PENDING
def add_step(self, step: SagaStep): """Add step to saga""" self.steps.append(step)
def execute(self, context: Dict[str, Any]) -> bool: """Execute saga""" self.status = SagaStatus.IN_PROGRESS
try: for step in self.steps: print(f"Executing step: {step.name}")
# Execute step result = step.execute(context)
if not result: # Step failed - compensate print(f"Step {step.name} failed. Compensating...") self._compensate() return False
# Track executed step self.executed_steps.append(step) context[f"{step.name}_result"] = result
# All steps succeeded self.status = SagaStatus.COMPLETED return True
except Exception as e: print(f"Saga failed: {e}") self._compensate() return False
def _compensate(self): """Compensate executed steps (in reverse order)""" self.status = SagaStatus.COMPENSATING
# Compensate in reverse order for step in reversed(self.executed_steps): try: print(f"Compensating step: {step.name}") step.compensate(step) except Exception as e: print(f"Compensation failed for {step.name}: {e}") # Log but continue compensating
self.status = SagaStatus.FAILED
# Example: Create Order Sagadef create_order(context): print("Creating order...") # Call order service return {'order_id': 123}
def cancel_order(step): print("Cancelling order...") # Call order service to cancel
def reserve_inventory(context): print("Reserving inventory...") # Call inventory service return {'reservation_id': 456}
def release_inventory(step): print("Releasing inventory...") # Call inventory service to release
def charge_payment(context): print("Charging payment...") # Call payment service return False # Simulate failure
def refund_payment(step): print("Refunding payment...") # Call payment service to refund
# Create sagaorchestrator = SagaOrchestrator()orchestrator.add_step(SagaStep( name='create_order', execute=create_order, compensate=cancel_order, service='order-service'))orchestrator.add_step(SagaStep( name='reserve_inventory', execute=reserve_inventory, compensate=release_inventory, service='inventory-service'))orchestrator.add_step(SagaStep( name='charge_payment', execute=charge_payment, compensate=refund_payment, service='payment-service'))
# Execute sagasuccess = orchestrator.execute({})import java.util.*;
enum SagaStatus { PENDING, IN_PROGRESS, COMPENSATING, COMPLETED, FAILED}
class SagaStep { private final String name; private final Function<Map<String, Object>, Object> execute; private final Consumer<SagaStep> compensate; private final String service;
// Constructor, getters...}
class SagaOrchestrator { private final List<SagaStep> steps = new ArrayList<>(); private final List<SagaStep> executedSteps = new ArrayList<>(); private SagaStatus status = SagaStatus.PENDING;
public void addStep(SagaStep step) { steps.add(step); }
public boolean execute(Map<String, Object> context) { status = SagaStatus.IN_PROGRESS;
try { for (SagaStep step : steps) { System.out.println("Executing step: " + step.getName());
// Execute step Object result = step.getExecute().apply(context);
if (result == null) { // Step failed - compensate System.out.println("Step " + step.getName() + " failed. Compensating..."); compensate(); return false; }
// Track executed step executedSteps.add(step); context.put(step.getName() + "_result", result); }
// All steps succeeded status = SagaStatus.COMPLETED; return true; } catch (Exception e) { System.err.println("Saga failed: " + e.getMessage()); compensate(); return false; } }
private void compensate() { status = SagaStatus.COMPENSATING;
// Compensate in reverse order Collections.reverse(executedSteps); for (SagaStep step : executedSteps) { try { System.out.println("Compensating step: " + step.getName()); step.getCompensate().accept(step); } catch (Exception e) { System.err.println("Compensation failed for " + step.getName() + ": " + e.getMessage()); } }
status = SagaStatus.FAILED; }}Distributed coordination. Each service knows next step.
Characteristics:
class OrderService: """Order service - starts saga"""
def create_order(self, order_data): """Create order and publish event""" # Create order (local transaction) order = self._create_order_local(order_data)
# Publish event event_bus.publish('OrderCreated', { 'order_id': order.id, 'user_id': order.user_id, 'amount': order.amount })
return order
def handle_payment_failed(self, event): """Compensate: Cancel order""" order_id = event['order_id'] self._cancel_order(order_id)
class InventoryService: """Inventory service - listens to OrderCreated"""
def handle_order_created(self, event): """Reserve inventory""" try: # Reserve inventory (local transaction) reservation = self._reserve_inventory_local( event['order_id'], event['items'] )
# Publish success event event_bus.publish('InventoryReserved', { 'order_id': event['order_id'], 'reservation_id': reservation.id }) except Exception as e: # Publish failure event event_bus.publish('InventoryReservationFailed', { 'order_id': event['order_id'], 'error': str(e) })
def handle_payment_failed(self, event): """Compensate: Release inventory""" order_id = event['order_id'] self._release_inventory(order_id)
class PaymentService: """Payment service - listens to InventoryReserved"""
def handle_inventory_reserved(self, event): """Charge payment""" try: # Charge payment (local transaction) payment = self._charge_payment_local( event['order_id'], event['amount'] )
# Publish success event event_bus.publish('PaymentCharged', { 'order_id': event['order_id'], 'payment_id': payment.id }) except Exception as e: # Publish failure event - triggers compensation event_bus.publish('PaymentFailed', { 'order_id': event['order_id'], 'error': str(e) })
# Setup event subscriptionsevent_bus.subscribe('OrderCreated', inventory_service.handle_order_created)event_bus.subscribe('InventoryReserved', payment_service.handle_inventory_reserved)event_bus.subscribe('PaymentFailed', order_service.handle_payment_failed)event_bus.subscribe('PaymentFailed', inventory_service.handle_payment_failed)class OrderService { public void createOrder(OrderData orderData) { // Create order (local transaction) Order order = createOrderLocal(orderData);
// Publish event eventBus.publish("OrderCreated", Map.of( "order_id", order.getId(), "user_id", order.getUserId(), "amount", order.getAmount() )); }
@EventHandler public void handlePaymentFailed(PaymentFailedEvent event) { // Compensate: Cancel order cancelOrder(event.getOrderId()); }}
class InventoryService { @EventHandler public void handleOrderCreated(OrderCreatedEvent event) { try { // Reserve inventory (local transaction) Reservation reservation = reserveInventoryLocal( event.getOrderId(), event.getItems() );
// Publish success event eventBus.publish("InventoryReserved", Map.of( "order_id", event.getOrderId(), "reservation_id", reservation.getId() )); } catch (Exception e) { // Publish failure event eventBus.publish("InventoryReservationFailed", Map.of( "order_id", event.getOrderId(), "error", e.getMessage() )); } }
@EventHandler public void handlePaymentFailed(PaymentFailedEvent event) { // Compensate: Release inventory releaseInventory(event.getOrderId()); }}| Aspect | Orchestrator | Choreography |
|---|---|---|
| Control | Centralized | Distributed |
| Complexity | Lower | Higher |
| Coupling | Higher | Lower |
| Debugging | Easier | Harder |
| Scalability | Lower | Higher |
| Use Case | Complex workflows | Simple workflows |
Choose Orchestrator when:
Choose Choreography when:
Undo the action (if possible):
Opposite transaction:
Automatic expiration:
🔄 Saga Pattern
Sequence of local transactions with compensation. No distributed locks, better scalability.
🎭 Orchestrator
Central coordinator. Easier to understand and debug. Good for complex workflows.
💃 Choreography
Distributed coordination. More decoupled and scalable. Good for simple workflows.
↩️ Compensation
Undo completed steps when saga fails. Not rollback, but compensating action. Critical for consistency.