Producer-Consumer Pattern
Introduction: The Producer-Consumer Problem
Section titled “Introduction: The Producer-Consumer Problem”The Producer-Consumer pattern is one of the most common concurrency patterns, especially in LLD interviews. It solves the problem of decoupling data production from data consumption.
Visual: The Problem
Section titled “Visual: The Problem”Why Use Producer-Consumer?
Section titled “Why Use Producer-Consumer?”Benefits:
- Decoupling: Producers and consumers don’t need to know about each other
- Rate Buffering: Handles speed mismatches (fast producer, slow consumer)
- Scalability: Easy to add more producers or consumers
- Backpressure: Bounded buffers prevent memory issues
Common Use Cases:
- Message queues
- Task queues
- Log processing
- Data pipelines
- Event-driven systems
The Basic Pattern
Section titled “The Basic Pattern”Visual: Producer-Consumer Flow
Section titled “Visual: Producer-Consumer Flow”Implementation Approach 1: Low-Level (Condition Variables)
Section titled “Implementation Approach 1: Low-Level (Condition Variables)”Let’s implement a bounded buffer from scratch using condition variables. This is often asked in interviews!
Visual: Condition Variable Flow
Section titled “Visual: Condition Variable Flow”Example: Bounded Buffer Implementation
Section titled “Example: Bounded Buffer Implementation”import threading
class BoundedBuffer: def __init__(self, capacity): self.capacity = capacity self.buffer = [] self.lock = threading.Lock() self.not_full = threading.Condition(self.lock) self.not_empty = threading.Condition(self.lock)
def put(self, item): """Add item to buffer, blocking if full""" with self.lock: # Wait while buffer is full while len(self.buffer) >= self.capacity: self.not_full.wait() # Releases lock, waits
self.buffer.append(item) print(f"Produced: {item}, Buffer size: {len(self.buffer)}") self.not_empty.notify() # Wake up waiting consumers
def get(self): """Remove item from buffer, blocking if empty""" with self.lock: # Wait while buffer is empty while len(self.buffer) == 0: self.not_empty.wait() # Releases lock, waits
item = self.buffer.pop(0) print(f"Consumed: {item}, Buffer size: {len(self.buffer)}") self.not_full.notify() # Wake up waiting producers return item
# Usagebuffer = BoundedBuffer(capacity=5)
def producer(): for i in range(10): buffer.put(i) import time time.sleep(0.1)
def consumer(): for _ in range(10): item = buffer.get() import time time.sleep(0.2)
threading.Thread(target=producer, daemon=True).start()threading.Thread(target=consumer, daemon=True).start()import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer<T> { private final Queue<T> buffer; private final int capacity; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int capacity) { this.capacity = capacity; this.buffer = new LinkedList<>(); }
public void put(T item) throws InterruptedException { lock.lock(); try { // Wait while buffer is full while (buffer.size() >= capacity) { notFull.await(); // Releases lock, waits }
buffer.offer(item); System.out.println("Produced: " + item + ", Buffer size: " + buffer.size()); notEmpty.signal(); // Wake up waiting consumers } finally { lock.unlock(); } }
public T get() throws InterruptedException { lock.lock(); try { // Wait while buffer is empty while (buffer.isEmpty()) { notEmpty.await(); // Releases lock, waits }
T item = buffer.poll(); System.out.println("Consumed: " + item + ", Buffer size: " + buffer.size()); notFull.signal(); // Wake up waiting producers return item; } finally { lock.unlock(); } }
public static void main(String[] args) throws InterruptedException { BoundedBuffer<Integer> buffer = new BoundedBuffer<>(5);
Thread producer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { buffer.put(i); Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } });
Thread consumer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { buffer.get(); Thread.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } });
producer.start(); consumer.start();
producer.join(); consumer.join(); }}Implementation Approach 2: High-Level (Blocking Queues)
Section titled “Implementation Approach 2: High-Level (Blocking Queues)”Using built-in blocking queues is much simpler and less error-prone!
Example: Using BlockingQueue (Java)
Section titled “Example: Using BlockingQueue (Java)”import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample { public static void main(String[] args) throws InterruptedException { // Create bounded blocking queue BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// Producer Thread producer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { queue.put(i); // Blocks if full System.out.println("Produced: " + i); Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } });
// Consumer Thread consumer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { int item = queue.take(); // Blocks if empty System.out.println("Consumed: " + item); Thread.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } });
producer.start(); consumer.start();
producer.join(); consumer.join(); }}Example: Using queue.Queue (Python)
Section titled “Example: Using queue.Queue (Python)”import queueimport threadingimport time
# Create bounded queueq = queue.Queue(maxsize=5)
def producer(): for i in range(10): q.put(i) # Blocks if full print(f"Produced: {i}") time.sleep(0.1)
def consumer(): for _ in range(10): item = q.get() # Blocks if empty print(f"Consumed: {item}") q.task_done() # Mark task as done time.sleep(0.2)
threading.Thread(target=producer, daemon=True).start()threading.Thread(target=consumer, daemon=True).start()
q.join() # Wait for all tasks to be doneComparison: ArrayBlockingQueue vs LinkedBlockingQueue
Section titled “Comparison: ArrayBlockingQueue vs LinkedBlockingQueue”| Feature | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| Backing | Array | Linked nodes |
| Bounded | Always bounded | Optionally bounded |
| Memory | Fixed size | Dynamic allocation |
| Throughput | Lower | Higher (typically) |
| Fairness | Optional | Optional |
Multiple Producers and Consumers
Section titled “Multiple Producers and Consumers”The pattern scales beautifully to multiple producers and consumers!
Visual: Multiple Producers/Consumers
Section titled “Visual: Multiple Producers/Consumers”Example: Multiple Producers/Consumers
Section titled “Example: Multiple Producers/Consumers”import queueimport threadingimport time
q = queue.Queue(maxsize=10)
def producer(producer_id): for i in range(5): item = f"P{producer_id}-{i}" q.put(item) print(f"Producer {producer_id} produced: {item}") time.sleep(0.1)
def consumer(consumer_id): while True: item = q.get() if item is None: # Poison pill q.task_done() break print(f"Consumer {consumer_id} consumed: {item}") q.task_done() time.sleep(0.2)
# Create multiple producersproducers = []for i in range(3): p = threading.Thread(target=producer, args=(i,)) producers.append(p) p.start()
# Create multiple consumersconsumers = []for i in range(2): c = threading.Thread(target=consumer, args=(i,)) consumers.append(c) c.start()
# Wait for producersfor p in producers: p.join()
# Send poison pillsfor _ in consumers: q.put(None)
# Wait for consumersfor c in consumers: c.join()import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;
public class MultipleProducersConsumers { private static final int POISON_PILL = -1;
public static void main(String[] args) throws InterruptedException { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// Multiple producers Thread[] producers = new Thread[3]; for (int i = 0; i < 3; i++) { final int producerId = i; producers[i] = new Thread(() -> { try { for (int j = 0; j < 5; j++) { int item = producerId * 10 + j; queue.put(item); System.out.println("Producer " + producerId + " produced: " + item); Thread.sleep(100); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); producers[i].start(); }
// Multiple consumers Thread[] consumers = new Thread[2]; for (int i = 0; i < 2; i++) { final int consumerId = i; consumers[i] = new Thread(() -> { try { while (true) { int item = queue.take(); if (item == POISON_PILL) { break; // Exit on poison pill } System.out.println("Consumer " + consumerId + " consumed: " + item); Thread.sleep(200); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); consumers[i].start(); }
// Wait for producers for (Thread producer : producers) { producer.join(); }
// Send poison pills for (int i = 0; i < consumers.length; i++) { queue.put(POISON_PILL); }
// Wait for consumers for (Thread consumer : consumers) { consumer.join(); } }}Handling Backpressure
Section titled “Handling Backpressure”Backpressure occurs when producers generate data faster than consumers can process it. Bounded buffers handle this automatically!
Visual: Backpressure Handling
Section titled “Visual: Backpressure Handling”Poison Pill Pattern
Section titled “Poison Pill Pattern”The poison pill pattern enables graceful shutdown by sending a special sentinel value that signals consumers to stop.
Visual: Poison Pill Pattern
Section titled “Visual: Poison Pill Pattern”Example: Poison Pill Implementation
Section titled “Example: Poison Pill Implementation”import queueimport threading
POISON_PILL = object() # Sentinel value
def producer(q): for i in range(10): q.put(i) print(f"Produced: {i}") q.put(POISON_PILL) # Send poison pill print("Producer finished")
def consumer(q, consumer_id): while True: item = q.get() if item is POISON_PILL: q.put(POISON_PILL) # Put it back for other consumers q.task_done() print(f"Consumer {consumer_id} shutting down") break print(f"Consumer {consumer_id} consumed: {item}") q.task_done()
q = queue.Queue()threading.Thread(target=producer, args=(q,)).start()
# Multiple consumersfor i in range(3): threading.Thread(target=consumer, args=(q, i)).start()import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;
public class PoisonPillExample { private static final String POISON_PILL = "POISON_PILL";
public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// Producer Thread producer = new Thread(() -> { try { for (int i = 0; i < 10; i++) { queue.put("Item-" + i); System.out.println("Produced: Item-" + i); } queue.put(POISON_PILL); // Send poison pill System.out.println("Producer finished"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } });
// Consumers Thread[] consumers = new Thread[3]; for (int i = 0; i < 3; i++) { final int consumerId = i; consumers[i] = new Thread(() -> { try { while (true) { String item = queue.take(); if (POISON_PILL.equals(item)) { queue.put(POISON_PILL); // Put back for others System.out.println("Consumer " + consumerId + " shutting down"); break; } System.out.println("Consumer " + consumerId + " consumed: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); consumers[i].start(); }
producer.start(); producer.join();
for (Thread consumer : consumers) { consumer.join(); } }}Priority Queue Producer-Consumer
Section titled “Priority Queue Producer-Consumer”Sometimes you need to process items by priority, not just FIFO order.
Example: Priority Queue
Section titled “Example: Priority Queue”import queueimport threading
class Task: def __init__(self, priority, data): self.priority = priority self.data = data
def __lt__(self, other): return self.priority < other.priority # Lower priority number = higher priority
pq = queue.PriorityQueue()
def producer(): tasks = [ Task(3, "Low priority task"), Task(1, "High priority task"), Task(2, "Medium priority task"), ] for task in tasks: pq.put(task) print(f"Produced: {task.data} (priority: {task.priority})")
def consumer(): while True: task = pq.get() if task is None: # Poison pill break print(f"Consumed: {task.data} (priority: {task.priority})") pq.task_done()
threading.Thread(target=producer).start()threading.Thread(target=consumer).start()import java.util.concurrent.PriorityBlockingQueue;
public class PriorityQueueExample { static class Task implements Comparable<Task> { int priority; String data;
Task(int priority, String data) { this.priority = priority; this.data = data; }
@Override public int compareTo(Task other) { return Integer.compare(this.priority, other.priority); } }
public static void main(String[] args) { PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
// Producer new Thread(() -> { queue.put(new Task(3, "Low priority task")); queue.put(new Task(1, "High priority task")); queue.put(new Task(2, "Medium priority task")); }).start();
// Consumer new Thread(() -> { try { while (true) { Task task = queue.take(); System.out.println("Consumed: " + task.data + " (priority: " + task.priority + ")"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); }}Real-World Example: Message Queue System
Section titled “Real-World Example: Message Queue System”Let’s build a more realistic example—a message queue system with multiple topics.
Example: Multi-Topic Message Queue
Section titled “Example: Multi-Topic Message Queue”import queueimport threadingfrom collections import defaultdict
class MessageQueue: def __init__(self): self.queues = defaultdict(lambda: queue.Queue(maxsize=100)) self.lock = threading.Lock()
def publish(self, topic, message): """Publish message to a topic""" with self.lock: if topic in self.queues: try: self.queues[topic].put_nowait(message) print(f"Published to {topic}: {message}") except queue.Full: print(f"Topic {topic} queue full, dropping message")
def subscribe(self, topic, callback): """Subscribe to a topic and process messages""" def consumer(): while True: try: message = self.queues[topic].get(timeout=1) if message is None: # Poison pill break callback(topic, message) self.queues[topic].task_done() except queue.Empty: continue
thread = threading.Thread(target=consumer, daemon=True) thread.start() return thread
# Usagemq = MessageQueue()
# Subscribersdef handle_order(topic, message): print(f"Processing order: {message}")
def handle_payment(topic, message): print(f"Processing payment: {message}")
mq.subscribe("orders", handle_order)mq.subscribe("payments", handle_payment)
# Publishersmq.publish("orders", "Order #123")mq.publish("payments", "Payment $100")import java.util.Map;import java.util.concurrent.*;import java.util.function.Consumer;
public class MessageQueue { private final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>();
public void publish(String topic, String message) { queues.computeIfAbsent(topic, k -> new LinkedBlockingQueue<>(100)) .offer(message); // Non-blocking System.out.println("Published to " + topic + ": " + message); }
public void subscribe(String topic, Consumer<String> callback) { BlockingQueue<String> queue = queues.computeIfAbsent( topic, k -> new LinkedBlockingQueue<>(100));
new Thread(() -> { try { while (true) { String message = queue.take(); if (message == null) break; // Poison pill callback.accept(message); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); }
public static void main(String[] args) { MessageQueue mq = new MessageQueue();
// Subscribers mq.subscribe("orders", msg -> System.out.println("Processing order: " + msg)); mq.subscribe("payments", msg -> System.out.println("Processing payment: " + msg));
// Publishers mq.publish("orders", "Order #123"); mq.publish("payments", "Payment $100"); }}Practice Problems
Section titled “Practice Problems”Easy: Bounded Buffer
Section titled “Easy: Bounded Buffer”Design a bounded buffer with blocking put() and get() operations.
Solution
See the BoundedBuffer implementation in the “Low-Level Implementation” section above.
Medium: Message Queue with Topics
Section titled “Medium: Message Queue with Topics”Design a message queue system supporting multiple topics/channels with separate queues.
Solution
See the MessageQueue example in the “Real-World Example” section above.
Interview Questions
Section titled “Interview Questions”Q1: “How would you implement a producer-consumer pattern?”
Section titled “Q1: “How would you implement a producer-consumer pattern?””Answer:
-
Low-level: Use condition variables with a lock
- Producers wait on
not_fullcondition when buffer is full - Consumers wait on
not_emptycondition when buffer is empty - Use
whileloops to handle spurious wakeups
- Producers wait on
-
High-level: Use blocking queues
BlockingQueuein Java orqueue.Queuein Pythonput()blocks when full,take()/get()blocks when empty- Much simpler and less error-prone
Q2: “What happens when the buffer is full? How do you handle backpressure?”
Section titled “Q2: “What happens when the buffer is full? How do you handle backpressure?””Answer:
- Bounded buffer: Producers block when buffer is full (automatic backpressure)
- Unbounded buffer: Can cause memory issues if producers are faster
- Backpressure strategies:
- Block producers (bounded buffer)
- Drop messages (with notification)
- Use backpressure signals to slow producers
- Implement priority-based dropping
Q3: “How would you implement a bounded buffer from scratch?”
Section titled “Q3: “How would you implement a bounded buffer from scratch?””Answer:
- Use a lock for mutual exclusion
- Use two condition variables:
not_fullandnot_empty put(): Acquire lock, wait onnot_fullwhile full, add item, signalnot_emptyget(): Acquire lock, wait onnot_emptywhile empty, remove item, signalnot_full- Always use
whileloops, notif, to handle spurious wakeups
Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?”
Section titled “Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?””Answer:
- ArrayBlockingQueue: Array-backed, always bounded, fixed memory, slightly lower throughput
- LinkedBlockingQueue: Node-based, optionally bounded, dynamic memory, typically higher throughput
- Choose: ArrayBlockingQueue for fixed-size needs, LinkedBlockingQueue for better performance
Key Takeaways
Section titled “Key Takeaways”Next Steps
Section titled “Next Steps”Continue learning concurrency patterns:
- Thread Pools & Executors - Efficient resource management
- Concurrent Collections - Thread-safe data structures
The Producer-Consumer pattern is fundamental to many concurrent systems! 🏭