🔄 Exchanges Route
Exchanges route messages to queues based on routing keys. Direct, topic, fanout, headers.
RabbitMQ is a traditional message broker implementing the AMQP (Advanced Message Queuing Protocol) standard.
Routes to queue with matching routing key.
Use case: Point-to-point messaging, task queues
Routes based on pattern matching (wildcards).
Patterns:
* - Matches one word# - Matches zero or more wordsUse case: Categorized messages, event routing
Broadcasts to all bound queues (ignores routing key).
Use case: Pub-sub, notifications, cache invalidation
Routes based on message headers (ignores routing key).
Use case: Complex routing logic
import pikaimport json
class RabbitMQProducer: """RabbitMQ producer"""
def __init__(self, host='localhost'): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=host) ) self.channel = self.connection.channel()
def setup_exchange(self, exchange_name: str, exchange_type: str = 'direct'): """Declare exchange""" self.channel.exchange_declare( exchange=exchange_name, exchange_type=exchange_type, durable=True # Survive broker restart )
def publish(self, exchange: str, routing_key: str, message: dict): """Publish message""" self.channel.basic_publish( exchange=exchange, routing_key=routing_key, body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2, # Make message persistent content_type='application/json' ) ) print(f"Published to {exchange} with key {routing_key}")
def close(self): """Close connection""" self.connection.close()
# Usageproducer = RabbitMQProducer()producer.setup_exchange('orders', 'direct')producer.publish('orders', 'order.created', { 'order_id': 123, 'user_id': 456, 'amount': 99.99})producer.close()import com.rabbitmq.client.*;
public class RabbitMQProducer { private final Connection connection; private final Channel channel;
public RabbitMQProducer(String host) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); this.connection = factory.newConnection(); this.channel = connection.createChannel(); }
public void setupExchange(String exchangeName, String exchangeType) throws Exception { // Declare exchange channel.exchangeDeclare(exchangeName, exchangeType, true); // Durable }
public void publish(String exchange, String routingKey, String message) throws Exception { // Publish message channel.basicPublish( exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, // Make persistent message.getBytes() ); System.out.println("Published to " + exchange + " with key " + routingKey); }
public void close() throws Exception { channel.close(); connection.close(); }}
// UsageRabbitMQProducer producer = new RabbitMQProducer("localhost");producer.setupExchange("orders", "direct");producer.publish("orders", "order.created", "{\"order_id\":123,\"user_id\":456,\"amount\":99.99}");producer.close();import pikaimport json
class RabbitMQConsumer: """RabbitMQ consumer"""
def __init__(self, host='localhost'): self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=host) ) self.channel = self.connection.channel()
def setup_queue(self, queue_name: str, durable: bool = True): """Declare queue""" self.channel.queue_declare( queue=queue_name, durable=durable # Survive broker restart )
def bind_queue(self, queue: str, exchange: str, routing_key: str): """Bind queue to exchange""" self.channel.queue_bind( queue=queue, exchange=exchange, routing_key=routing_key )
def consume(self, queue: str, handler, auto_ack: bool = False): """Consume messages""" def callback(ch, method, properties, body): try: message = json.loads(body) # Process message handler(message)
# Acknowledge message if not auto_ack: ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(f"Error processing message: {e}") # Reject and requeue if not auto_ack: ch.basic_nack( delivery_tag=method.delivery_tag, requeue=True )
# Set prefetch (how many unacked messages per consumer) self.channel.basic_qos(prefetch_count=1)
# Start consuming self.channel.basic_consume( queue=queue, on_message_callback=callback, auto_ack=auto_ack )
print(f"Consuming from {queue}...") self.channel.start_consuming()
def close(self): """Close connection""" self.connection.close()
# Usagedef handle_order(message): print(f"Processing order: {message['order_id']}") # Process order...
consumer = RabbitMQConsumer()consumer.setup_queue('order-processor', durable=True)consumer.bind_queue('order-processor', 'orders', 'order.created')consumer.consume('order-processor', handle_order, auto_ack=False)import com.rabbitmq.client.*;
public class RabbitMQConsumer { private final Connection connection; private final Channel channel;
public RabbitMQConsumer(String host) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); this.connection = factory.newConnection(); this.channel = connection.createChannel(); }
public void setupQueue(String queueName, boolean durable) throws Exception { // Declare queue channel.queueDeclare(queueName, durable, false, false, null); }
public void bindQueue(String queue, String exchange, String routingKey) throws Exception { // Bind queue to exchange channel.queueBind(queue, exchange, routingKey); }
public void consume(String queue, java.util.function.Consumer<String> handler, boolean autoAck) throws Exception { // Set prefetch channel.basicQos(1);
// Consumer callback DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { String message = new String(delivery.getBody(), "UTF-8"); // Process message handler.accept(message);
// Acknowledge if (!autoAck) { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } catch (Exception e) { System.err.println("Error processing message: " + e.getMessage()); // Reject and requeue if (!autoAck) { channel.basicNack( delivery.getEnvelope().getDeliveryTag(), false, true // Requeue ); } } };
// Start consuming channel.basicConsume(queue, autoAck, deliverCallback, consumerTag -> {}); System.out.println("Consuming from " + queue + "..."); }
public void close() throws Exception { channel.close(); connection.close(); }}
// UsageRabbitMQConsumer consumer = new RabbitMQConsumer("localhost");consumer.setupQueue("order-processor", true);consumer.bindQueue("order-processor", "orders", "order.created");consumer.consume("order-processor", message -> { System.out.println("Processing: " + message);}, false);Critical for reliable message processing.
# Message removed immediately when deliveredchannel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=True)Problem: If consumer crashes, message lost!
# Message removed only after ackdef callback(ch, method, properties, body): process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) # Acknowledge
channel.basic_consume(queue='orders', on_message_callback=callback, auto_ack=False)Benefits:
| Feature | RabbitMQ | Kafka |
|---|---|---|
| Model | Traditional broker | Streaming platform |
| Message Retention | Removed after consumption | Retained (configurable) |
| Routing | Flexible (exchanges) | Simple (topics/partitions) |
| Ordering | Per queue | Per partition |
| Throughput | Good | Excellent |
| Use Case | Task queues, RPC | Event streaming, logs |
Choose RabbitMQ when:
Choose Kafka when:
🔄 Exchanges Route
Exchanges route messages to queues based on routing keys. Direct, topic, fanout, headers.
✅ Manual Ack
Use manual acknowledgment for reliability. Auto-ack removes messages immediately (risky).
💾 Durability
Make queues/exchanges/messages durable to survive broker restart. Critical for production.
📊 Flexible Routing
RabbitMQ’s flexible routing (exchanges) makes it great for complex routing scenarios.