📜 Event Sourcing
Store all changes as events. State = replay of events. Enables audit trail, time travel, replay.
Event-Driven Architecture (EDA) = Systems communicate through events (things that happened).
Store all changes as sequence of events. State = replay of events.
Problem: Lost history! Can’t see how state changed.
Benefits:
from typing import List, Dict, Anyfrom datetime import datetimefrom dataclasses import dataclassfrom enum import Enum
class EventType(Enum): USER_CREATED = "user.created" BALANCE_ADDED = "balance.added" BALANCE_SUBTRACTED = "balance.subtracted"
@dataclassclass Event: """Base event class""" event_id: str event_type: EventType aggregate_id: str data: Dict[str, Any] timestamp: datetime version: int
class EventStore: """Event store - append-only log"""
def __init__(self): self.events: List[Event] = []
def append(self, event: Event): """Append event (immutable)""" self.events.append(event)
def get_events(self, aggregate_id: str) -> List[Event]: """Get all events for aggregate""" return [e for e in self.events if e.aggregate_id == aggregate_id]
def replay(self, aggregate_id: str) -> Dict[str, Any]: """Replay events to rebuild state""" events = self.get_events(aggregate_id) state = {}
for event in events: if event.event_type == EventType.USER_CREATED: state['id'] = event.data['user_id'] state['balance'] = 0 state['status'] = 'active' elif event.event_type == EventType.BALANCE_ADDED: state['balance'] = state.get('balance', 0) + event.data['amount'] elif event.event_type == EventType.BALANCE_SUBTRACTED: state['balance'] = state.get('balance', 0) - event.data['amount']
return state
class UserAggregate: """User aggregate with event sourcing"""
def __init__(self, event_store: EventStore): self.event_store = event_store
def create_user(self, user_id: str, initial_balance: int = 0): """Create user (command)""" event = Event( event_id=f"evt-{datetime.now().timestamp()}", event_type=EventType.USER_CREATED, aggregate_id=user_id, data={'user_id': user_id, 'initial_balance': initial_balance}, timestamp=datetime.now(), version=1 ) self.event_store.append(event)
def add_balance(self, user_id: str, amount: int): """Add balance (command)""" event = Event( event_id=f"evt-{datetime.now().timestamp()}", event_type=EventType.BALANCE_ADDED, aggregate_id=user_id, data={'amount': amount}, timestamp=datetime.now(), version=self._get_next_version(user_id) ) self.event_store.append(event)
def get_user_state(self, user_id: str) -> Dict[str, Any]: """Get current state (query) - replay events""" return self.event_store.replay(user_id)
def _get_next_version(self, aggregate_id: str) -> int: """Get next version number""" events = self.event_store.get_events(aggregate_id) return len(events) + 1
# Usageevent_store = EventStore()user_aggregate = UserAggregate(event_store)
# Create useruser_aggregate.create_user('user-123', initial_balance=0)
# Add balanceuser_aggregate.add_balance('user-123', 50)user_aggregate.add_balance('user-123', 100)user_aggregate.add_balance('user-123', -30)
# Get current state (replay events)state = user_aggregate.get_user_state('user-123')print(f"Current balance: {state['balance']}") # 120
# Get full historyevents = event_store.get_events('user-123')for event in events: print(f"{event.event_type}: {event.data}")import java.time.LocalDateTime;import java.util.*;
class Event { private final String eventId; private final EventType eventType; private final String aggregateId; private final Map<String, Object> data; private final LocalDateTime timestamp; private final int version;
// Constructor, getters...}
enum EventType { USER_CREATED, BALANCE_ADDED, BALANCE_SUBTRACTED}
class EventStore { private final List<Event> events = new ArrayList<>();
public void append(Event event) { events.add(event); }
public List<Event> getEvents(String aggregateId) { return events.stream() .filter(e -> e.getAggregateId().equals(aggregateId)) .collect(Collectors.toList()); }
public Map<String, Object> replay(String aggregateId) { List<Event> events = getEvents(aggregateId); Map<String, Object> state = new HashMap<>();
for (Event event : events) { switch (event.getEventType()) { case USER_CREATED: state.put("id", event.getData().get("user_id")); state.put("balance", 0); state.put("status", "active"); break; case BALANCE_ADDED: state.put("balance", (Integer) state.getOrDefault("balance", 0) + (Integer) event.getData().get("amount")); break; case BALANCE_SUBTRACTED: state.put("balance", (Integer) state.getOrDefault("balance", 0) - (Integer) event.getData().get("amount")); break; } }
return state; }}
class UserAggregate { private final EventStore eventStore;
public void createUser(String userId, int initialBalance) { Event event = new Event( UUID.randomUUID().toString(), EventType.USER_CREATED, userId, Map.of("user_id", userId, "initial_balance", initialBalance), LocalDateTime.now(), 1 ); eventStore.append(event); }
public Map<String, Object> getUserState(String userId) { return eventStore.replay(userId); }}Separate read and write models.
Benefits:
class WriteModel: """Write model - handles commands"""
def __init__(self, event_store): self.event_store = event_store
def create_user(self, user_id: str, email: str): """Command: Create user""" event = Event( event_id=generate_id(), event_type=EventType.USER_CREATED, aggregate_id=user_id, data={'user_id': user_id, 'email': email}, timestamp=datetime.now(), version=1 ) self.event_store.append(event) # Publish event for read model update event_bus.publish(event)
class ReadModel: """Read model - optimized for queries"""
def __init__(self): self.users = {} # Denormalized, optimized for reads
def handle_user_created(self, event: Event): """Projection: Update read model""" self.users[event.aggregate_id] = { 'id': event.data['user_id'], 'email': event.data['email'], 'created_at': event.timestamp }
def get_user(self, user_id: str) -> Dict: """Query: Get user (fast!)""" return self.users.get(user_id)
def list_users(self) -> List[Dict]: """Query: List users (fast!)""" return list(self.users.values())
# Usageevent_store = EventStore()write_model = WriteModel(event_store)read_model = ReadModel()
# Subscribe read model to eventsevent_bus.subscribe(EventType.USER_CREATED, read_model.handle_user_created)
# Write (command)
# Read (query) - fast, optimizeduser = read_model.get_user('user-123')users = read_model.list_users()class WriteModel { private final EventStore eventStore; private final EventBus eventBus;
public void createUser(String userId, String email) { Event event = new Event( UUID.randomUUID().toString(), EventType.USER_CREATED, userId, Map.of("user_id", userId, "email", email), LocalDateTime.now(), 1 ); eventStore.append(event); eventBus.publish(event); }}
class ReadModel { private final Map<String, UserDTO> users = new HashMap<>();
public void handleUserCreated(Event event) { users.put(event.getAggregateId(), new UserDTO( event.getData().get("user_id"), event.getData().get("email"), event.getTimestamp() )); }
public UserDTO getUser(String userId) { return users.get(userId); }
public List<UserDTO> listUsers() { return new ArrayList<>(users.values()); }}Rebuild state by replaying events.
Problem: Replaying millions of events is slow!
Solution: Snapshots!
Strategy:
📜 Event Sourcing
Store all changes as events. State = replay of events. Enables audit trail, time travel, replay.
🔄 CQRS
Separate read and write models. Optimize independently. Scale independently. Better performance.
⏪ Event Replay
Replay events to rebuild state. Use snapshots for performance. Enables debugging, testing, time travel.
🎯 Events are Facts
Events represent something that happened. Immutable. Past tense. Full history of changes.