Skip to content
Low Level Design Mastery Logo
LowLevelDesign Mastery

Kafka Deep Dive

Distributed streaming platform for high-throughput event processing

Apache Kafka is a distributed streaming platform for building real-time data pipelines and streaming applications.

Diagram
  1. Distributed - Runs on multiple servers (brokers)
  2. Fault-tolerant - Replicates data across brokers
  3. High-throughput - Millions of messages per second
  4. Scalable - Add brokers/partitions to scale
  5. Durable - Messages persisted to disk
  6. Real-time - Low latency streaming

Topic = Category/stream of messages. Like a table or folder.

Diagram

Characteristics:

  • โœ… Immutable log - Messages appended, never modified
  • โœ… Partitioned - Split into partitions for parallelism
  • โœ… Replicated - Copies across brokers for reliability
  • โœ… Ordered - Messages ordered within partition

Partition = Ordered sequence of messages within topic.

Diagram

Why Partitions?

  • โœ… Parallelism - Multiple consumers process different partitions
  • โœ… Scalability - Add partitions to scale throughput
  • โœ… Ordering - Messages ordered within partition (not globally)

Partitioning Strategy:

  • Key-based - Same key โ†’ same partition (ensures ordering)
  • Round-robin - Distribute evenly (no key)

Consumer Group = Group of consumers working together.

Diagram

Key Rules:

  • โœ… One partition โ†’ one consumer (in same group)
  • โœ… One consumer โ†’ multiple partitions (can handle multiple)
  • โœ… Rebalancing - When consumer joins/leaves, partitions redistributed

Example:

  • Topic has 3 partitions
  • Consumer group has 2 consumers
  • Consumer 1 gets partitions 0, 1
  • Consumer 2 gets partition 2

Offset = Position of message in partition.

Diagram

Offset Management:

  • Consumer tracks current offset
  • After processing, commits offset
  • On restart, resumes from committed offset

Diagram

Broker = Kafka server. Cluster = Multiple brokers.

Replication:

  • Leader - Handles reads/writes for partition
  • Followers - Replicate leaderโ€™s data
  • ISR (In-Sync Replicas) - Followers in sync with leader

"kafka_producer.py
from kafka import KafkaProducer
import json
class KafkaEventProducer:
"""Kafka producer for events"""
def __init__(self, bootstrap_servers: list):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
# Idempotent producer (exactly-once)
enable_idempotence=True,
acks='all', # Wait for all replicas
retries=3,
max_in_flight_requests_per_connection=1
)
def send_event(self, topic: str, event: dict, key: str = None):
"""Send event to topic"""
future = self.producer.send(
topic,
key=key,
value=event
)
# Wait for acknowledgment
try:
record_metadata = future.get(timeout=10)
print(f"Event sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
return record_metadata
except Exception as e:
print(f"Error sending event: {e}")
raise
def send_with_callback(self, topic: str, event: dict, key: str = None):
"""Send event with callback"""
def on_send_success(record_metadata):
print(f"Event sent: {record_metadata.topic}/"
f"{record_metadata.partition}/"
f"{record_metadata.offset}")
def on_send_error(exception):
print(f"Error sending event: {exception}")
self.producer.send(
topic,
key=key,
value=event
).add_callback(on_send_success).add_errback(on_send_error)
def flush(self):
"""Flush pending messages"""
self.producer.flush()
def close(self):
"""Close producer"""
self.producer.close()
# Usage
producer = KafkaEventProducer(['localhost:9092'])
# Send event
producer.send_event('user-events', {
'event_type': 'user.created',
'user_id': 123,
'email': '[email protected]',
'timestamp': '2024-01-01T10:00:00Z'
}, key='123') # Key ensures same user goes to same partition
producer.flush()
producer.close()

"kafka_consumer.py
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
from typing import Callable
class KafkaEventConsumer:
"""Kafka consumer for events"""
def __init__(self, bootstrap_servers: list, group_id: str):
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
enable_auto_commit=False, # Manual offset commit
auto_offset_reset='earliest', # Start from beginning if no offset
max_poll_records=100 # Batch size
)
def subscribe(self, topics: list):
"""Subscribe to topics"""
self.consumer.subscribe(topics)
print(f"Subscribed to topics: {topics}")
def consume(self, handler: Callable):
"""Consume messages"""
try:
while True:
# Poll for messages (batch)
message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
for message in messages:
try:
# Process message
handler(message.value, message.key, message.offset)
# Commit offset after processing
self.consumer.commit()
except Exception as e:
print(f"Error processing message: {e}")
# Don't commit - will retry
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
self.consumer.close()
def consume_with_manual_commit(self, handler: Callable):
"""Consume with manual offset commit"""
try:
while True:
message_batch = self.consumer.poll(timeout_ms=1000)
offsets_to_commit = {}
for topic_partition, messages in message_batch.items():
for message in messages:
try:
# Process message
handler(message.value, message.key, message.offset)
# Track offset for commit
offsets_to_commit[topic_partition] = \
OffsetAndMetadata(message.offset + 1, None)
except Exception as e:
print(f"Error processing message: {e}")
# Don't commit failed messages
break
# Commit all processed offsets
if offsets_to_commit:
self.consumer.commit(offsets_to_commit)
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
self.consumer.close()
# Usage
def handle_event(event, key, offset):
print(f"Processing event: {event['event_type']} "
f"key: {key} offset: {offset}")
# Process event...
consumer = KafkaEventConsumer(
['localhost:9092'],
group_id='event-processors'
)
consumer.subscribe(['user-events', 'order-events'])
consumer.consume(handle_event)

Ensures message processed exactly once.

  1. Idempotent Producer

    • Prevents duplicate messages from retries
    • Uses producer ID and sequence numbers
  2. Transactional Producer

    • Atomic writes across partitions
    • Uses transactions
  3. Idempotent Consumer

    • Tracks processed offsets
    • Deduplicates messages
"exactly_once.py
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
from typing import Set
class ExactlyOnceProcessor:
"""Exactly-once message processing"""
def __init__(self, bootstrap_servers: list, group_id: str):
# Idempotent producer
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
enable_idempotence=True,
acks='all',
transactional_id='exactly-once-producer'
)
# Consumer with manual commit
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
enable_auto_commit=False,
isolation_level='read_committed' # Only read committed messages
)
# Track processed offsets (for idempotency)
self.processed_offsets: Set[tuple] = set()
def process_exactly_once(self, topic: str, handler):
"""Process messages exactly once"""
self.consumer.subscribe([topic])
try:
while True:
message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
# Begin transaction
self.producer.begin_transaction()
try:
for message in messages:
# Check if already processed (idempotency)
offset_key = (topic_partition.topic,
topic_partition.partition,
message.offset)
if offset_key in self.processed_offsets:
print(f"Skipping duplicate: {offset_key}")
continue
# Process message
result = handler(message.value)
# Send result to output topic (in transaction)
if result:
self.producer.send('output-topic',
value=json.dumps(result))
# Mark as processed
self.processed_offsets.add(offset_key)
# Commit transaction (atomic)
self.producer.commit_transaction()
# Commit consumer offset
self.consumer.commit()
except Exception as e:
# Abort transaction on error
self.producer.abort_transaction()
print(f"Error processing: {e}")
finally:
self.producer.close()
self.consumer.close()

๐Ÿ“Š Partitions Enable Parallelism

Partitions allow multiple consumers to process topic in parallel. Ordering guaranteed per partition.

๐Ÿ‘ฅ Consumer Groups Scale

Consumer groups distribute partitions across consumers. Add consumers to scale throughput.

๐Ÿ“ Offsets Track Progress

Offsets track consumer position. Commit offsets to resume after restart. Critical for reliability.

โœ… Exactly-Once is Complex

Exactly-once requires idempotent producer, transactions, and idempotent consumer. Use when duplicates are critical.