๐ Producer-Consumer
One producer, multiple consumers. Each message processed once. Perfect for task distribution.
Synchronous systems: Service A calls Service B and waits for response.
Problems:
Solution: Message Queues - Asynchronous, decoupled communication!
Message queue = Buffer that stores messages between senders and receivers.
One producer, one or more consumers. Each message processed once.
Characteristics:
Use cases:
import queueimport threadingimport timefrom typing import Optional
class MessageQueue: """Simple message queue implementation"""
def __init__(self, maxsize: int = 1000): self.queue = queue.Queue(maxsize=maxsize) self.consumers = [] self.running = False
def produce(self, message: dict): """Producer: Add message to queue""" try: self.queue.put_nowait(message) print(f"Produced: {message}") except queue.Full: print("Queue full! Message rejected.")
def consume(self, handler): """Consumer: Process messages from queue""" while self.running: try: message = self.queue.get(timeout=1) # Process message handler(message) # Acknowledge self.queue.task_done() except queue.Empty: continue
def start_consumer(self, handler, consumer_id: int): """Start a consumer thread""" def consumer_loop(): print(f"Consumer {consumer_id} started") self.consume(handler)
thread = threading.Thread(target=consumer_loop, daemon=True) thread.start() self.consumers.append(thread)
def start(self): """Start queue processing""" self.running = True
def stop(self): """Stop queue processing""" self.running = False self.queue.join() # Wait for all tasks to complete
# Usagequeue = MessageQueue(maxsize=100)queue.start()
# Message handlerdef process_order(message): print(f"Processing order: {message['order_id']}") # Process order... time.sleep(1) # Simulate work print(f"Order {message['order_id']} processed")
# Start multiple consumersqueue.start_consumer(process_order, consumer_id=1)queue.start_consumer(process_order, consumer_id=2)
# Producer sends messagesfor i in range(10): queue.produce({'order_id': i, 'amount': 100 + i}) time.sleep(0.1)
time.sleep(5) # Let consumers processqueue.stop()import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.function.Consumer;
public class MessageQueue { private final BlockingQueue<Message> queue; private final ExecutorService executor; private volatile boolean running = false;
public MessageQueue(int maxSize) { this.queue = new LinkedBlockingQueue<>(maxSize); this.executor = Executors.newCachedThreadPool(); }
public void produce(Message message) { // Producer: Add message to queue if (queue.offer(message)) { System.out.println("Produced: " + message); } else { System.out.println("Queue full! Message rejected."); } }
public void consume(Consumer<Message> handler) { // Consumer: Process messages from queue while (running) { try { Message message = queue.take(); // Blocks until message available handler.accept(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } }
public void startConsumer(Consumer<Message> handler, int consumerId) { // Start a consumer thread executor.submit(() -> { System.out.println("Consumer " + consumerId + " started"); consume(handler); }); }
public void start() { running = true; }
public void stop() { running = false; executor.shutdown(); }}
// UsageMessageQueue queue = new MessageQueue(1000);queue.start();
// Message handlerConsumer<Message> processOrder = message -> { System.out.println("Processing order: " + message.getOrderId()); // Process order... try { Thread.sleep(1000); // Simulate work } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("Order " + message.getOrderId() + " processed");};
// Start multiple consumersqueue.startConsumer(processOrder, 1);queue.startConsumer(processOrder, 2);
// Producer sends messagesfor (int i = 0; i < 10; i++) { queue.produce(new Message(i, 100 + i)); Thread.sleep(100);}
Thread.sleep(5000); // Let consumers processqueue.stop();One publisher, multiple subscribers. Each subscriber gets copy of message.
Characteristics:
Use cases:
from typing import List, Callable, Dict, Anyfrom threading import Lockimport threading
class Topic: """Pub-sub topic"""
def __init__(self, name: str): self.name = name self.subscribers: List[Callable] = [] self.lock = Lock()
def subscribe(self, handler: Callable): """Subscribe to topic""" with self.lock: self.subscribers.append(handler) print(f"Subscriber added to {self.name}. Total: {len(self.subscribers)}")
def unsubscribe(self, handler: Callable): """Unsubscribe from topic""" with self.lock: if handler in self.subscribers: self.subscribers.remove(handler)
def publish(self, message: Dict[str, Any]): """Publish message to all subscribers""" with self.lock: subscribers = self.subscribers.copy()
# Notify all subscribers (asynchronously) for subscriber in subscribers: try: # Run in separate thread to avoid blocking threading.Thread( target=subscriber, args=(message,), daemon=True ).start() except Exception as e: print(f"Error notifying subscriber: {e}")
class PubSubBroker: """Pub-sub message broker"""
def __init__(self): self.topics: Dict[str, Topic] = {} self.lock = Lock()
def get_topic(self, name: str) -> Topic: """Get or create topic""" with self.lock: if name not in self.topics: self.topics[name] = Topic(name) return self.topics[name]
def publish(self, topic_name: str, message: Dict[str, Any]): """Publish message to topic""" topic = self.get_topic(topic_name) topic.publish(message)
def subscribe(self, topic_name: str, handler: Callable): """Subscribe to topic""" topic = self.get_topic(topic_name) topic.subscribe(handler)
# Usagebroker = PubSubBroker()
# Subscribersdef email_handler(message): print(f"Email Service: Sending email for {message['event']}")
def sms_handler(message): print(f"SMS Service: Sending SMS for {message['event']}")
def analytics_handler(message): print(f"Analytics Service: Recording {message['event']}")
# Subscribe to 'user.created' topicbroker.subscribe('user.created', email_handler)broker.subscribe('user.created', sms_handler)broker.subscribe('user.created', analytics_handler)
# Publisher publishes eventbroker.publish('user.created', { 'event': 'user.created', 'user_id': 123,})import java.util.*;import java.util.concurrent.CopyOnWriteArrayList;import java.util.function.Consumer;
class Topic { private final String name; private final List<Consumer<Message>> subscribers = new CopyOnWriteArrayList<>();
public Topic(String name) { this.name = name; }
public void subscribe(Consumer<Message> handler) { subscribers.add(handler); System.out.println("Subscriber added to " + name + ". Total: " + subscribers.size()); }
public void unsubscribe(Consumer<Message> handler) { subscribers.remove(handler); }
public void publish(Message message) { // Notify all subscribers subscribers.forEach(subscriber -> { try { subscriber.accept(message); } catch (Exception e) { System.err.println("Error notifying subscriber: " + e.getMessage()); } }); }}
class PubSubBroker { private final Map<String, Topic> topics = new HashMap<>();
public synchronized Topic getTopic(String name) { return topics.computeIfAbsent(name, Topic::new); }
public void publish(String topicName, Message message) { Topic topic = getTopic(topicName); topic.publish(message); }
public void subscribe(String topicName, Consumer<Message> handler) { Topic topic = getTopic(topicName); topic.subscribe(handler); }}
// UsagePubSubBroker broker = new PubSubBroker();
// SubscribersConsumer<Message> emailHandler = message -> System.out.println("Email Service: Sending email for " + message.getEvent());
Consumer<Message> smsHandler = message -> System.out.println("SMS Service: Sending SMS for " + message.getEvent());
Consumer<Message> analyticsHandler = message -> System.out.println("Analytics Service: Recording " + message.getEvent());
// Subscribe to 'user.created' topicbroker.subscribe("user.created", emailHandler);broker.subscribe("user.created", smsHandler);broker.subscribe("user.created", analyticsHandler);
// Publisher publishes eventMessage may be lost, but never duplicated.
Characteristics:
Use when: Non-critical messages, metrics, logs
Message delivered at least once, may have duplicates.
Characteristics:
Use when: Critical messages, order processing, payments
Message delivered exactly once. Requires deduplication.
Characteristics:
Use when: Financial transactions, critical operations
Ordering ensures messages processed in sequence.
Example: User account balance updates
Message 1: Balance = 100Message 2: Balance = 150 (add 50)Message 3: Balance = 120 (subtract 30)Correct order: 100 โ 150 โ 120 โ
Wrong order: 100 โ 120 โ 150 = 150 โ (wrong!)
1. Per-Partition Ordering (Kafka)
2. Per-Queue Ordering (RabbitMQ)
3. Global Ordering
At the code level, message queues translate to producer/consumer classes, message handlers, and acknowledgment logic.
from abc import ABC, abstractmethodfrom typing import Dict, Any, Optionalimport json
class MessageHandler(ABC): """Base message handler interface"""
@abstractmethod def handle(self, message: Dict[str, Any]) -> bool: """ Handle message. Returns True if successful. Should be idempotent for at-least-once delivery. """ pass
@abstractmethod def can_handle(self, message_type: str) -> bool: """Check if handler can process this message type""" pass
class OrderProcessor(MessageHandler): """Process order messages"""
def __init__(self, order_service): self.order_service = order_service self.processed_ids = set() # For idempotency
def can_handle(self, message_type: str) -> bool: return message_type == 'order.created'
def handle(self, message: Dict[str, Any]) -> bool: order_id = message.get('order_id')
# Idempotency check if order_id in self.processed_ids: print(f"Order {order_id} already processed. Skipping.") return True # Already processed, consider success
try: # Process order self.order_service.process_order(order_id, message)
# Mark as processed self.processed_ids.add(order_id)
return True except Exception as e: print(f"Error processing order {order_id}: {e}") return False # Return False to trigger retry
class MessageConsumer: """Consumer that routes messages to handlers"""
def __init__(self, queue, handlers: List[MessageHandler]): self.queue = queue self.handlers = handlers
def consume(self): """Consume messages from queue""" while True: try: message_data = self.queue.get(timeout=1) message = json.loads(message_data)
# Find handler handler = self.find_handler(message.get('type'))
if handler: # Process message success = handler.handle(message)
if success: # Acknowledge message self.queue.task_done() else: # Return to queue for retry self.queue.put(message_data) else: print(f"No handler for message type: {message.get('type')}") self.queue.task_done()
except Exception as e: print(f"Error consuming message: {e}")
def find_handler(self, message_type: str) -> Optional[MessageHandler]: """Find handler for message type""" for handler in self.handlers: if handler.can_handle(message_type): return handler return Noneimport java.util.*;
interface MessageHandler { boolean handle(Message message); boolean canHandle(String messageType);}
class OrderProcessor implements MessageHandler { private final OrderService orderService; private final Set<String> processedIds = new HashSet<>();
public OrderProcessor(OrderService orderService) { this.orderService = orderService; }
@Override public boolean canHandle(String messageType) { return "order.created".equals(messageType); }
@Override public boolean handle(Message message) { String orderId = message.getOrderId();
// Idempotency check synchronized (processedIds) { if (processedIds.contains(orderId)) { System.out.println("Order " + orderId + " already processed. Skipping."); return true; // Already processed } }
try { // Process order orderService.processOrder(orderId, message);
// Mark as processed synchronized (processedIds) { processedIds.add(orderId); }
return true; } catch (Exception e) { System.err.println("Error processing order " + orderId + ": " + e.getMessage()); return false; // Return false to trigger retry } }}
class MessageConsumer { private final BlockingQueue<String> queue; private final List<MessageHandler> handlers;
public MessageConsumer(BlockingQueue<String> queue, List<MessageHandler> handlers) { this.queue = queue; this.handlers = handlers; }
public void consume() { while (true) { try { String messageData = queue.take(); Message message = parseMessage(messageData);
// Find handler MessageHandler handler = findHandler(message.getType());
if (handler != null) { // Process message boolean success = handler.handle(message);
if (success) { // Message processed successfully // (Acknowledgment handled by queue) } else { // Return to queue for retry queue.put(messageData); } } else { System.err.println("No handler for message type: " + message.getType()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { System.err.println("Error consuming message: " + e.getMessage()); } } }
private MessageHandler findHandler(String messageType) { return handlers.stream() .filter(h -> h.canHandle(messageType)) .findFirst() .orElse(null); }}๐ Producer-Consumer
One producer, multiple consumers. Each message processed once. Perfect for task distribution.
๐ข Pub-Sub
One publisher, multiple subscribers. Each gets copy. Perfect for event broadcasting.
โ Delivery Guarantees
At-least-once most common. Requires idempotent consumers. Exactly-once is hardest but most reliable.
๐ Ordering Matters
Message ordering critical for state changes. Per-partition ordering balances order and parallelism.