Skip to content
Low Level Design Mastery Logo
LowLevelDesign Mastery

Message Queues Fundamentals

Decoupling systems with asynchronous messaging

Synchronous systems: Service A calls Service B and waits for response.

Diagram

Problems:

  • โŒ Service A blocks waiting for B
  • โŒ If B is slow, A is slow
  • โŒ If B crashes, A fails
  • โŒ Tight coupling

Solution: Message Queues - Asynchronous, decoupled communication!


Message queue = Buffer that stores messages between senders and receivers.

Diagram
  1. Decoupling - Services donโ€™t know about each other
  2. Asynchronous - Non-blocking communication
  3. Reliability - Messages persisted, can retry
  4. Scalability - Multiple consumers, load distribution
  5. Resilience - If consumer fails, messages stay in queue

One producer, one or more consumers. Each message processed once.

Diagram

Characteristics:

  • โœ… Load balancing - Multiple consumers share work
  • โœ… Task distribution - Work distributed across consumers
  • โœ… Scalability - Add more consumers to scale
  • โœ… Each message processed once - One consumer gets each message

Use cases:

  • Background job processing
  • Email sending
  • Image processing
  • Data transformation
  • Task queues
"producer_consumer.py
import queue
import threading
import time
from typing import Optional
class MessageQueue:
"""Simple message queue implementation"""
def __init__(self, maxsize: int = 1000):
self.queue = queue.Queue(maxsize=maxsize)
self.consumers = []
self.running = False
def produce(self, message: dict):
"""Producer: Add message to queue"""
try:
self.queue.put_nowait(message)
print(f"Produced: {message}")
except queue.Full:
print("Queue full! Message rejected.")
def consume(self, handler):
"""Consumer: Process messages from queue"""
while self.running:
try:
message = self.queue.get(timeout=1)
# Process message
handler(message)
# Acknowledge
self.queue.task_done()
except queue.Empty:
continue
def start_consumer(self, handler, consumer_id: int):
"""Start a consumer thread"""
def consumer_loop():
print(f"Consumer {consumer_id} started")
self.consume(handler)
thread = threading.Thread(target=consumer_loop, daemon=True)
thread.start()
self.consumers.append(thread)
def start(self):
"""Start queue processing"""
self.running = True
def stop(self):
"""Stop queue processing"""
self.running = False
self.queue.join() # Wait for all tasks to complete
# Usage
queue = MessageQueue(maxsize=100)
queue.start()
# Message handler
def process_order(message):
print(f"Processing order: {message['order_id']}")
# Process order...
time.sleep(1) # Simulate work
print(f"Order {message['order_id']} processed")
# Start multiple consumers
queue.start_consumer(process_order, consumer_id=1)
queue.start_consumer(process_order, consumer_id=2)
# Producer sends messages
for i in range(10):
queue.produce({'order_id': i, 'amount': 100 + i})
time.sleep(0.1)
time.sleep(5) # Let consumers process
queue.stop()

One publisher, multiple subscribers. Each subscriber gets copy of message.

Diagram

Characteristics:

  • โœ… Fan-out - One message โ†’ multiple consumers
  • โœ… Decoupling - Publisher doesnโ€™t know subscribers
  • โœ… Event broadcasting - Notify multiple services
  • โœ… Each subscriber gets copy - Independent processing

Use cases:

  • Event notifications
  • Real-time updates
  • Cache invalidation
  • Log aggregation
  • Analytics events
"pub_sub.py
from typing import List, Callable, Dict, Any
from threading import Lock
import threading
class Topic:
"""Pub-sub topic"""
def __init__(self, name: str):
self.name = name
self.subscribers: List[Callable] = []
self.lock = Lock()
def subscribe(self, handler: Callable):
"""Subscribe to topic"""
with self.lock:
self.subscribers.append(handler)
print(f"Subscriber added to {self.name}. Total: {len(self.subscribers)}")
def unsubscribe(self, handler: Callable):
"""Unsubscribe from topic"""
with self.lock:
if handler in self.subscribers:
self.subscribers.remove(handler)
def publish(self, message: Dict[str, Any]):
"""Publish message to all subscribers"""
with self.lock:
subscribers = self.subscribers.copy()
# Notify all subscribers (asynchronously)
for subscriber in subscribers:
try:
# Run in separate thread to avoid blocking
threading.Thread(
target=subscriber,
args=(message,),
daemon=True
).start()
except Exception as e:
print(f"Error notifying subscriber: {e}")
class PubSubBroker:
"""Pub-sub message broker"""
def __init__(self):
self.topics: Dict[str, Topic] = {}
self.lock = Lock()
def get_topic(self, name: str) -> Topic:
"""Get or create topic"""
with self.lock:
if name not in self.topics:
self.topics[name] = Topic(name)
return self.topics[name]
def publish(self, topic_name: str, message: Dict[str, Any]):
"""Publish message to topic"""
topic = self.get_topic(topic_name)
topic.publish(message)
def subscribe(self, topic_name: str, handler: Callable):
"""Subscribe to topic"""
topic = self.get_topic(topic_name)
topic.subscribe(handler)
# Usage
broker = PubSubBroker()
# Subscribers
def email_handler(message):
print(f"Email Service: Sending email for {message['event']}")
def sms_handler(message):
print(f"SMS Service: Sending SMS for {message['event']}")
def analytics_handler(message):
print(f"Analytics Service: Recording {message['event']}")
# Subscribe to 'user.created' topic
broker.subscribe('user.created', email_handler)
broker.subscribe('user.created', sms_handler)
broker.subscribe('user.created', analytics_handler)
# Publisher publishes event
broker.publish('user.created', {
'event': 'user.created',
'user_id': 123,
'email': '[email protected]'
})

Message may be lost, but never duplicated.

Diagram

Characteristics:

  • โœ… Simple
  • โœ… Low latency
  • โŒ May lose messages
  • โŒ No retry

Use when: Non-critical messages, metrics, logs

Message delivered at least once, may have duplicates.

Diagram

Characteristics:

  • โœ… Reliable (wonโ€™t lose messages)
  • โœ… Retries on failure
  • โŒ May have duplicates
  • โŒ Consumer must be idempotent

Use when: Critical messages, order processing, payments

Message delivered exactly once. Requires deduplication.

Diagram

Characteristics:

  • โœ… No duplicates
  • โœ… No lost messages
  • โŒ Complex implementation
  • โŒ Performance overhead

Use when: Financial transactions, critical operations


Ordering ensures messages processed in sequence.

Example: User account balance updates

Message 1: Balance = 100
Message 2: Balance = 150 (add 50)
Message 3: Balance = 120 (subtract 30)

Correct order: 100 โ†’ 150 โ†’ 120 โœ…
Wrong order: 100 โ†’ 120 โ†’ 150 = 150 โŒ (wrong!)

1. Per-Partition Ordering (Kafka)

  • Messages in same partition processed in order
  • Different partitions can process in parallel

2. Per-Queue Ordering (RabbitMQ)

  • Single consumer per queue
  • Messages processed sequentially

3. Global Ordering

  • All messages processed in order
  • Slower (no parallelism)

  • โœ… Decoupling needed - Services shouldnโ€™t know about each other
  • โœ… Async processing - Donโ€™t want to wait for response
  • โœ… Load leveling - Smooth out traffic spikes
  • โœ… Reliability - Need guaranteed delivery
  • โœ… Scalability - Need to scale consumers independently
  • โŒ Synchronous operations - Need immediate response
  • โŒ Simple request-response - Overhead not worth it
  • โŒ Ordering not needed - Can use simpler patterns
  • โŒ Low latency critical - Queues add latency

At the code level, message queues translate to producer/consumer classes, message handlers, and acknowledgment logic.

"message_handler.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import json
class MessageHandler(ABC):
"""Base message handler interface"""
@abstractmethod
def handle(self, message: Dict[str, Any]) -> bool:
"""
Handle message. Returns True if successful.
Should be idempotent for at-least-once delivery.
"""
pass
@abstractmethod
def can_handle(self, message_type: str) -> bool:
"""Check if handler can process this message type"""
pass
class OrderProcessor(MessageHandler):
"""Process order messages"""
def __init__(self, order_service):
self.order_service = order_service
self.processed_ids = set() # For idempotency
def can_handle(self, message_type: str) -> bool:
return message_type == 'order.created'
def handle(self, message: Dict[str, Any]) -> bool:
order_id = message.get('order_id')
# Idempotency check
if order_id in self.processed_ids:
print(f"Order {order_id} already processed. Skipping.")
return True # Already processed, consider success
try:
# Process order
self.order_service.process_order(order_id, message)
# Mark as processed
self.processed_ids.add(order_id)
return True
except Exception as e:
print(f"Error processing order {order_id}: {e}")
return False # Return False to trigger retry
class MessageConsumer:
"""Consumer that routes messages to handlers"""
def __init__(self, queue, handlers: List[MessageHandler]):
self.queue = queue
self.handlers = handlers
def consume(self):
"""Consume messages from queue"""
while True:
try:
message_data = self.queue.get(timeout=1)
message = json.loads(message_data)
# Find handler
handler = self.find_handler(message.get('type'))
if handler:
# Process message
success = handler.handle(message)
if success:
# Acknowledge message
self.queue.task_done()
else:
# Return to queue for retry
self.queue.put(message_data)
else:
print(f"No handler for message type: {message.get('type')}")
self.queue.task_done()
except Exception as e:
print(f"Error consuming message: {e}")
def find_handler(self, message_type: str) -> Optional[MessageHandler]:
"""Find handler for message type"""
for handler in self.handlers:
if handler.can_handle(message_type):
return handler
return None

๐Ÿ”„ Producer-Consumer

One producer, multiple consumers. Each message processed once. Perfect for task distribution.

๐Ÿ“ข Pub-Sub

One publisher, multiple subscribers. Each gets copy. Perfect for event broadcasting.

โœ… Delivery Guarantees

At-least-once most common. Requires idempotent consumers. Exactly-once is hardest but most reliable.

๐Ÿ“Š Ordering Matters

Message ordering critical for state changes. Per-partition ordering balances order and parallelism.