Skip to content
Low Level Design Mastery Logo
LowLevelDesign Mastery

Event-Driven Architecture

Building systems around events, not requests

Event-Driven Architecture (EDA) = Systems communicate through events (things that happened).

Diagram
  1. Events are Immutable - Can’t change past
  2. Events are Facts - Something happened
  3. Decoupled - Producers don’t know consumers
  4. Asynchronous - Non-blocking
  5. Reactive - Consumers react to events

Store all changes as sequence of events. State = replay of events.

Diagram

Problem: Lost history! Can’t see how state changed.

Diagram

Benefits:

  • Full history - Every change recorded
  • Replay - Rebuild state from events
  • Audit trail - See what happened when
  • Time travel - See state at any point
"event_sourcing.py
from typing import List, Dict, Any
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
class EventType(Enum):
USER_CREATED = "user.created"
BALANCE_ADDED = "balance.added"
BALANCE_SUBTRACTED = "balance.subtracted"
@dataclass
class Event:
"""Base event class"""
event_id: str
event_type: EventType
aggregate_id: str
data: Dict[str, Any]
timestamp: datetime
version: int
class EventStore:
"""Event store - append-only log"""
def __init__(self):
self.events: List[Event] = []
def append(self, event: Event):
"""Append event (immutable)"""
self.events.append(event)
def get_events(self, aggregate_id: str) -> List[Event]:
"""Get all events for aggregate"""
return [e for e in self.events if e.aggregate_id == aggregate_id]
def replay(self, aggregate_id: str) -> Dict[str, Any]:
"""Replay events to rebuild state"""
events = self.get_events(aggregate_id)
state = {}
for event in events:
if event.event_type == EventType.USER_CREATED:
state['id'] = event.data['user_id']
state['balance'] = 0
state['status'] = 'active'
elif event.event_type == EventType.BALANCE_ADDED:
state['balance'] = state.get('balance', 0) + event.data['amount']
elif event.event_type == EventType.BALANCE_SUBTRACTED:
state['balance'] = state.get('balance', 0) - event.data['amount']
return state
class UserAggregate:
"""User aggregate with event sourcing"""
def __init__(self, event_store: EventStore):
self.event_store = event_store
def create_user(self, user_id: str, initial_balance: int = 0):
"""Create user (command)"""
event = Event(
event_id=f"evt-{datetime.now().timestamp()}",
event_type=EventType.USER_CREATED,
aggregate_id=user_id,
data={'user_id': user_id, 'initial_balance': initial_balance},
timestamp=datetime.now(),
version=1
)
self.event_store.append(event)
def add_balance(self, user_id: str, amount: int):
"""Add balance (command)"""
event = Event(
event_id=f"evt-{datetime.now().timestamp()}",
event_type=EventType.BALANCE_ADDED,
aggregate_id=user_id,
data={'amount': amount},
timestamp=datetime.now(),
version=self._get_next_version(user_id)
)
self.event_store.append(event)
def get_user_state(self, user_id: str) -> Dict[str, Any]:
"""Get current state (query) - replay events"""
return self.event_store.replay(user_id)
def _get_next_version(self, aggregate_id: str) -> int:
"""Get next version number"""
events = self.event_store.get_events(aggregate_id)
return len(events) + 1
# Usage
event_store = EventStore()
user_aggregate = UserAggregate(event_store)
# Create user
user_aggregate.create_user('user-123', initial_balance=0)
# Add balance
user_aggregate.add_balance('user-123', 50)
user_aggregate.add_balance('user-123', 100)
user_aggregate.add_balance('user-123', -30)
# Get current state (replay events)
state = user_aggregate.get_user_state('user-123')
print(f"Current balance: {state['balance']}") # 120
# Get full history
events = event_store.get_events('user-123')
for event in events:
print(f"{event.event_type}: {event.data}")

CQRS (Command Query Responsibility Segregation)

Section titled “CQRS (Command Query Responsibility Segregation)”

Separate read and write models.

Diagram

Benefits:

  • Optimize independently - Write for consistency, read for speed
  • Scale independently - More read replicas
  • Different databases - SQL for writes, NoSQL for reads
  • Performance - Read model optimized for queries
"cqrs.py
class WriteModel:
"""Write model - handles commands"""
def __init__(self, event_store):
self.event_store = event_store
def create_user(self, user_id: str, email: str):
"""Command: Create user"""
event = Event(
event_id=generate_id(),
event_type=EventType.USER_CREATED,
aggregate_id=user_id,
data={'user_id': user_id, 'email': email},
timestamp=datetime.now(),
version=1
)
self.event_store.append(event)
# Publish event for read model update
event_bus.publish(event)
class ReadModel:
"""Read model - optimized for queries"""
def __init__(self):
self.users = {} # Denormalized, optimized for reads
def handle_user_created(self, event: Event):
"""Projection: Update read model"""
self.users[event.aggregate_id] = {
'id': event.data['user_id'],
'email': event.data['email'],
'created_at': event.timestamp
}
def get_user(self, user_id: str) -> Dict:
"""Query: Get user (fast!)"""
return self.users.get(user_id)
def list_users(self) -> List[Dict]:
"""Query: List users (fast!)"""
return list(self.users.values())
# Usage
event_store = EventStore()
write_model = WriteModel(event_store)
read_model = ReadModel()
# Subscribe read model to events
event_bus.subscribe(EventType.USER_CREATED, read_model.handle_user_created)
# Write (command)
write_model.create_user('user-123', '[email protected]')
# Read (query) - fast, optimized
user = read_model.get_user('user-123')
users = read_model.list_users()

Rebuild state by replaying events.

  1. Rebuild Read Model - After crash or migration
  2. Debugging - See state at any point
  3. Testing - Replay events in test
  4. Time Travel - See past state

Problem: Replaying millions of events is slow!

Solution: Snapshots!

Diagram

Strategy:

  • Create snapshot every N events (e.g., 10,000)
  • Replay from latest snapshot
  • Much faster!

📜 Event Sourcing

Store all changes as events. State = replay of events. Enables audit trail, time travel, replay.

🔄 CQRS

Separate read and write models. Optimize independently. Scale independently. Better performance.

⏪ Event Replay

Replay events to rebuild state. Use snapshots for performance. Enables debugging, testing, time travel.

🎯 Events are Facts

Events represent something that happened. Immutable. Past tense. Full history of changes.