Skip to content
Low Level Design Mastery Logo
LowLevelDesign Mastery

Producer-Consumer Pattern

Decoupling production from consumption for scalable systems.

Introduction: The Producer-Consumer Problem

Section titled “Introduction: The Producer-Consumer Problem”

The Producer-Consumer pattern is one of the most common concurrency patterns, especially in LLD interviews. It solves the problem of decoupling data production from data consumption.

Diagram

Benefits:

  1. Decoupling: Producers and consumers don’t need to know about each other
  2. Rate Buffering: Handles speed mismatches (fast producer, slow consumer)
  3. Scalability: Easy to add more producers or consumers
  4. Backpressure: Bounded buffers prevent memory issues

Common Use Cases:

  • Message queues
  • Task queues
  • Log processing
  • Data pipelines
  • Event-driven systems

Diagram

Implementation Approach 1: Low-Level (Condition Variables)

Section titled “Implementation Approach 1: Low-Level (Condition Variables)”

Let’s implement a bounded buffer from scratch using condition variables. This is often asked in interviews!

Diagram
bounded_buffer.py
import threading
class BoundedBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def put(self, item):
"""Add item to buffer, blocking if full"""
with self.lock:
# Wait while buffer is full
while len(self.buffer) >= self.capacity:
self.not_full.wait() # Releases lock, waits
self.buffer.append(item)
print(f"Produced: {item}, Buffer size: {len(self.buffer)}")
self.not_empty.notify() # Wake up waiting consumers
def get(self):
"""Remove item from buffer, blocking if empty"""
with self.lock:
# Wait while buffer is empty
while len(self.buffer) == 0:
self.not_empty.wait() # Releases lock, waits
item = self.buffer.pop(0)
print(f"Consumed: {item}, Buffer size: {len(self.buffer)}")
self.not_full.notify() # Wake up waiting producers
return item
# Usage
buffer = BoundedBuffer(capacity=5)
def producer():
for i in range(10):
buffer.put(i)
import time
time.sleep(0.1)
def consumer():
for _ in range(10):
item = buffer.get()
import time
time.sleep(0.2)
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()

Implementation Approach 2: High-Level (Blocking Queues)

Section titled “Implementation Approach 2: High-Level (Blocking Queues)”

Using built-in blocking queues is much simpler and less error-prone!

BlockingQueueExample.java
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// Create bounded blocking queue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
queue.put(i); // Blocks if full
System.out.println("Produced: " + i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
int item = queue.take(); // Blocks if empty
System.out.println("Consumed: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
queue_example.py
import queue
import threading
import time
# Create bounded queue
q = queue.Queue(maxsize=5)
def producer():
for i in range(10):
q.put(i) # Blocks if full
print(f"Produced: {i}")
time.sleep(0.1)
def consumer():
for _ in range(10):
item = q.get() # Blocks if empty
print(f"Consumed: {item}")
q.task_done() # Mark task as done
time.sleep(0.2)
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()
q.join() # Wait for all tasks to be done

Comparison: ArrayBlockingQueue vs LinkedBlockingQueue

Section titled “Comparison: ArrayBlockingQueue vs LinkedBlockingQueue”
FeatureArrayBlockingQueueLinkedBlockingQueue
BackingArrayLinked nodes
BoundedAlways boundedOptionally bounded
MemoryFixed sizeDynamic allocation
ThroughputLowerHigher (typically)
FairnessOptionalOptional

The pattern scales beautifully to multiple producers and consumers!

Diagram
multiple_producers_consumers.py
import queue
import threading
import time
q = queue.Queue(maxsize=10)
def producer(producer_id):
for i in range(5):
item = f"P{producer_id}-{i}"
q.put(item)
print(f"Producer {producer_id} produced: {item}")
time.sleep(0.1)
def consumer(consumer_id):
while True:
item = q.get()
if item is None: # Poison pill
q.task_done()
break
print(f"Consumer {consumer_id} consumed: {item}")
q.task_done()
time.sleep(0.2)
# Create multiple producers
producers = []
for i in range(3):
p = threading.Thread(target=producer, args=(i,))
producers.append(p)
p.start()
# Create multiple consumers
consumers = []
for i in range(2):
c = threading.Thread(target=consumer, args=(i,))
consumers.append(c)
c.start()
# Wait for producers
for p in producers:
p.join()
# Send poison pills
for _ in consumers:
q.put(None)
# Wait for consumers
for c in consumers:
c.join()

Backpressure occurs when producers generate data faster than consumers can process it. Bounded buffers handle this automatically!

Diagram

The poison pill pattern enables graceful shutdown by sending a special sentinel value that signals consumers to stop.

Diagram
poison_pill.py
import queue
import threading
POISON_PILL = object() # Sentinel value
def producer(q):
for i in range(10):
q.put(i)
print(f"Produced: {i}")
q.put(POISON_PILL) # Send poison pill
print("Producer finished")
def consumer(q, consumer_id):
while True:
item = q.get()
if item is POISON_PILL:
q.put(POISON_PILL) # Put it back for other consumers
q.task_done()
print(f"Consumer {consumer_id} shutting down")
break
print(f"Consumer {consumer_id} consumed: {item}")
q.task_done()
q = queue.Queue()
threading.Thread(target=producer, args=(q,)).start()
# Multiple consumers
for i in range(3):
threading.Thread(target=consumer, args=(q, i)).start()

Sometimes you need to process items by priority, not just FIFO order.

priority_queue.py
import queue
import threading
class Task:
def __init__(self, priority, data):
self.priority = priority
self.data = data
def __lt__(self, other):
return self.priority < other.priority # Lower priority number = higher priority
pq = queue.PriorityQueue()
def producer():
tasks = [
Task(3, "Low priority task"),
Task(1, "High priority task"),
Task(2, "Medium priority task"),
]
for task in tasks:
pq.put(task)
print(f"Produced: {task.data} (priority: {task.priority})")
def consumer():
while True:
task = pq.get()
if task is None: # Poison pill
break
print(f"Consumed: {task.data} (priority: {task.priority})")
pq.task_done()
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Let’s build a more realistic example—a message queue system with multiple topics.

message_queue.py
import queue
import threading
from collections import defaultdict
class MessageQueue:
def __init__(self):
self.queues = defaultdict(lambda: queue.Queue(maxsize=100))
self.lock = threading.Lock()
def publish(self, topic, message):
"""Publish message to a topic"""
with self.lock:
if topic in self.queues:
try:
self.queues[topic].put_nowait(message)
print(f"Published to {topic}: {message}")
except queue.Full:
print(f"Topic {topic} queue full, dropping message")
def subscribe(self, topic, callback):
"""Subscribe to a topic and process messages"""
def consumer():
while True:
try:
message = self.queues[topic].get(timeout=1)
if message is None: # Poison pill
break
callback(topic, message)
self.queues[topic].task_done()
except queue.Empty:
continue
thread = threading.Thread(target=consumer, daemon=True)
thread.start()
return thread
# Usage
mq = MessageQueue()
# Subscribers
def handle_order(topic, message):
print(f"Processing order: {message}")
def handle_payment(topic, message):
print(f"Processing payment: {message}")
mq.subscribe("orders", handle_order)
mq.subscribe("payments", handle_payment)
# Publishers
mq.publish("orders", "Order #123")
mq.publish("payments", "Payment $100")

Design a bounded buffer with blocking put() and get() operations.

Solution

See the BoundedBuffer implementation in the “Low-Level Implementation” section above.

Design a message queue system supporting multiple topics/channels with separate queues.

Solution

See the MessageQueue example in the “Real-World Example” section above.


Q1: “How would you implement a producer-consumer pattern?”

Section titled “Q1: “How would you implement a producer-consumer pattern?””

Answer:

  1. Low-level: Use condition variables with a lock

    • Producers wait on not_full condition when buffer is full
    • Consumers wait on not_empty condition when buffer is empty
    • Use while loops to handle spurious wakeups
  2. High-level: Use blocking queues

    • BlockingQueue in Java or queue.Queue in Python
    • put() blocks when full, take()/get() blocks when empty
    • Much simpler and less error-prone

Q2: “What happens when the buffer is full? How do you handle backpressure?”

Section titled “Q2: “What happens when the buffer is full? How do you handle backpressure?””

Answer:

  • Bounded buffer: Producers block when buffer is full (automatic backpressure)
  • Unbounded buffer: Can cause memory issues if producers are faster
  • Backpressure strategies:
    • Block producers (bounded buffer)
    • Drop messages (with notification)
    • Use backpressure signals to slow producers
    • Implement priority-based dropping

Q3: “How would you implement a bounded buffer from scratch?”

Section titled “Q3: “How would you implement a bounded buffer from scratch?””

Answer:

  1. Use a lock for mutual exclusion
  2. Use two condition variables: not_full and not_empty
  3. put(): Acquire lock, wait on not_full while full, add item, signal not_empty
  4. get(): Acquire lock, wait on not_empty while empty, remove item, signal not_full
  5. Always use while loops, not if, to handle spurious wakeups

Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?”

Section titled “Q4: “What’s the difference between ArrayBlockingQueue and LinkedBlockingQueue?””

Answer:

  • ArrayBlockingQueue: Array-backed, always bounded, fixed memory, slightly lower throughput
  • LinkedBlockingQueue: Node-based, optionally bounded, dynamic memory, typically higher throughput
  • Choose: ArrayBlockingQueue for fixed-size needs, LinkedBlockingQueue for better performance


Continue learning concurrency patterns:

The Producer-Consumer pattern is fundamental to many concurrent systems! 🏭