๐ Partitions Enable Parallelism
Partitions allow multiple consumers to process topic in parallel. Ordering guaranteed per partition.
Apache Kafka is a distributed streaming platform for building real-time data pipelines and streaming applications.
Topic = Category/stream of messages. Like a table or folder.
Characteristics:
Partition = Ordered sequence of messages within topic.
Why Partitions?
Partitioning Strategy:
Consumer Group = Group of consumers working together.
Key Rules:
Example:
Offset = Position of message in partition.
Offset Management:
Broker = Kafka server. Cluster = Multiple brokers.
Replication:
from kafka import KafkaProducerimport json
class KafkaEventProducer: """Kafka producer for events"""
def __init__(self, bootstrap_servers: list): self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8') if k else None, # Idempotent producer (exactly-once) enable_idempotence=True, acks='all', # Wait for all replicas retries=3, max_in_flight_requests_per_connection=1 )
def send_event(self, topic: str, event: dict, key: str = None): """Send event to topic""" future = self.producer.send( topic, key=key, value=event )
# Wait for acknowledgment try: record_metadata = future.get(timeout=10) print(f"Event sent to {record_metadata.topic} " f"partition {record_metadata.partition} " f"offset {record_metadata.offset}") return record_metadata except Exception as e: print(f"Error sending event: {e}") raise
def send_with_callback(self, topic: str, event: dict, key: str = None): """Send event with callback""" def on_send_success(record_metadata): print(f"Event sent: {record_metadata.topic}/" f"{record_metadata.partition}/" f"{record_metadata.offset}")
def on_send_error(exception): print(f"Error sending event: {exception}")
self.producer.send( topic, key=key, value=event ).add_callback(on_send_success).add_errback(on_send_error)
def flush(self): """Flush pending messages""" self.producer.flush()
def close(self): """Close producer""" self.producer.close()
# Usageproducer = KafkaEventProducer(['localhost:9092'])
# Send eventproducer.send_event('user-events', { 'event_type': 'user.created', 'user_id': 123, 'timestamp': '2024-01-01T10:00:00Z'}, key='123') # Key ensures same user goes to same partition
producer.flush()producer.close()import org.apache.kafka.clients.producer.*;import org.apache.kafka.common.serialization.StringSerializer;import com.fasterxml.jackson.databind.ObjectMapper;import java.util.Properties;
public class KafkaEventProducer { private final KafkaProducer<String, String> producer; private final ObjectMapper objectMapper = new ObjectMapper();
public KafkaEventProducer(String bootstrapServers) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Idempotent producer (exactly-once) props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
this.producer = new KafkaProducer<>(props); }
public void sendEvent(String topic, Map<String, Object> event, String key) { try { String value = objectMapper.writeValueAsString(event);
ProducerRecord<String, String> record = new ProducerRecord<>( topic, key, value );
// Send with callback producer.send(record, (metadata, exception) -> { if (exception != null) { System.err.println("Error sending event: " + exception.getMessage()); } else { System.out.println("Event sent to " + metadata.topic() + "/" + metadata.partition() + "/" + metadata.offset()); } }); } catch (Exception e) { System.err.println("Error serializing event: " + e.getMessage()); } }
public void flush() { producer.flush(); }
public void close() { producer.close(); }}
// UsageKafkaEventProducer producer = new KafkaEventProducer("localhost:9092");
Map<String, Object> event = new HashMap<>();event.put("event_type", "user.created");event.put("user_id", 123);event.put("timestamp", "2024-01-01T10:00:00Z");
producer.sendEvent("user-events", event, "123");producer.flush();producer.close();from kafka import KafkaConsumerfrom kafka.errors import KafkaErrorimport jsonfrom typing import Callable
class KafkaEventConsumer: """Kafka consumer for events"""
def __init__(self, bootstrap_servers: list, group_id: str): self.consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers, group_id=group_id, value_deserializer=lambda m: json.loads(m.decode('utf-8')), key_deserializer=lambda k: k.decode('utf-8') if k else None, enable_auto_commit=False, # Manual offset commit auto_offset_reset='earliest', # Start from beginning if no offset max_poll_records=100 # Batch size )
def subscribe(self, topics: list): """Subscribe to topics""" self.consumer.subscribe(topics) print(f"Subscribed to topics: {topics}")
def consume(self, handler: Callable): """Consume messages""" try: while True: # Poll for messages (batch) message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items(): for message in messages: try: # Process message handler(message.value, message.key, message.offset)
# Commit offset after processing self.consumer.commit() except Exception as e: print(f"Error processing message: {e}") # Don't commit - will retry except KeyboardInterrupt: print("Stopping consumer...") finally: self.consumer.close()
def consume_with_manual_commit(self, handler: Callable): """Consume with manual offset commit""" try: while True: message_batch = self.consumer.poll(timeout_ms=1000)
offsets_to_commit = {}
for topic_partition, messages in message_batch.items(): for message in messages: try: # Process message handler(message.value, message.key, message.offset)
# Track offset for commit offsets_to_commit[topic_partition] = \ OffsetAndMetadata(message.offset + 1, None) except Exception as e: print(f"Error processing message: {e}") # Don't commit failed messages break
# Commit all processed offsets if offsets_to_commit: self.consumer.commit(offsets_to_commit) except KeyboardInterrupt: print("Stopping consumer...") finally: self.consumer.close()
# Usagedef handle_event(event, key, offset): print(f"Processing event: {event['event_type']} " f"key: {key} offset: {offset}") # Process event...
consumer = KafkaEventConsumer( ['localhost:9092'], group_id='event-processors')consumer.subscribe(['user-events', 'order-events'])consumer.consume(handle_event)import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;import com.fasterxml.jackson.databind.ObjectMapper;import java.time.Duration;import java.util.*;
public class KafkaEventConsumer { private final KafkaConsumer<String, String> consumer; private final ObjectMapper objectMapper = new ObjectMapper();
public KafkaEventConsumer(String bootstrapServers, String groupId) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // Manual commit props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
this.consumer = new KafkaConsumer<>(props); }
public void subscribe(List<String> topics) { consumer.subscribe(topics); System.out.println("Subscribed to topics: " + topics); }
public void consume(java.util.function.Consumer<Map<String, Object>> handler) { try { while (true) { // Poll for messages (batch) ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { try { // Deserialize event Map<String, Object> event = objectMapper.readValue( record.value(), Map.class );
// Process event handler.accept(event);
// Commit offset after processing consumer.commitSync(); } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); // Don't commit - will retry } } } } catch (Exception e) { System.err.println("Consumer error: " + e.getMessage()); } finally { consumer.close(); } }}
// UsageKafkaEventConsumer consumer = new KafkaEventConsumer( "localhost:9092", "event-processors");
consumer.subscribe(Arrays.asList("user-events", "order-events"));
consumer.consume(event -> { System.out.println("Processing event: " + event.get("event_type")); // Process event...});Ensures message processed exactly once.
Idempotent Producer
Transactional Producer
Idempotent Consumer
from kafka import KafkaProducer, KafkaConsumerfrom kafka.errors import KafkaErrorimport jsonfrom typing import Set
class ExactlyOnceProcessor: """Exactly-once message processing"""
def __init__(self, bootstrap_servers: list, group_id: str): # Idempotent producer self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, enable_idempotence=True, acks='all', transactional_id='exactly-once-producer' )
# Consumer with manual commit self.consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers, group_id=group_id, enable_auto_commit=False, isolation_level='read_committed' # Only read committed messages )
# Track processed offsets (for idempotency) self.processed_offsets: Set[tuple] = set()
def process_exactly_once(self, topic: str, handler): """Process messages exactly once""" self.consumer.subscribe([topic])
try: while True: message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items(): # Begin transaction self.producer.begin_transaction()
try: for message in messages: # Check if already processed (idempotency) offset_key = (topic_partition.topic, topic_partition.partition, message.offset)
if offset_key in self.processed_offsets: print(f"Skipping duplicate: {offset_key}") continue
# Process message result = handler(message.value)
# Send result to output topic (in transaction) if result: self.producer.send('output-topic', value=json.dumps(result))
# Mark as processed self.processed_offsets.add(offset_key)
# Commit transaction (atomic) self.producer.commit_transaction()
# Commit consumer offset self.consumer.commit() except Exception as e: # Abort transaction on error self.producer.abort_transaction() print(f"Error processing: {e}") finally: self.producer.close() self.consumer.close()import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.*;
public class ExactlyOnceProcessor { private final KafkaProducer<String, String> producer; private final KafkaConsumer<String, String> consumer; private final Set<String> processedOffsets = new HashSet<>();
public ExactlyOnceProcessor(String bootstrapServers, String groupId) { // Idempotent producer Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "exactly-once-producer"); this.producer = new KafkaProducer<>(producerProps); this.producer.initTransactions();
// Consumer with manual commit Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); this.consumer = new KafkaConsumer<>(consumerProps); }
public void processExactlyOnce(String topic, java.util.function.Function<String, String> handler) { consumer.subscribe(Collections.singletonList(topic));
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// Begin transaction producer.beginTransaction();
try { for (ConsumerRecord<String, String> record : records) { // Check if already processed String offsetKey = record.topic() + "-" + record.partition() + "-" + record.offset();
if (processedOffsets.contains(offsetKey)) { System.out.println("Skipping duplicate: " + offsetKey); continue; }
// Process message String result = handler.apply(record.value());
// Send result (in transaction) if (result != null) { producer.send(new ProducerRecord<>("output-topic", result)); }
// Mark as processed processedOffsets.add(offsetKey); }
// Commit transaction producer.commitTransaction();
// Commit consumer offset consumer.commitSync(); } catch (Exception e) { // Abort transaction on error producer.abortTransaction(); System.err.println("Error processing: " + e.getMessage()); } } } finally { producer.close(); consumer.close(); } }}๐ Partitions Enable Parallelism
Partitions allow multiple consumers to process topic in parallel. Ordering guaranteed per partition.
๐ฅ Consumer Groups Scale
Consumer groups distribute partitions across consumers. Add consumers to scale throughput.
๐ Offsets Track Progress
Offsets track consumer position. Commit offsets to resume after restart. Critical for reliability.
โ Exactly-Once is Complex
Exactly-once requires idempotent producer, transactions, and idempotent consumer. Use when duplicates are critical.