🔄 Idempotency is Critical
Make operations safe to retry. Essential for distributed systems, retries, at-least-once delivery.
In distributed systems, messages/requests can be duplicated:
Problem: Without idempotency, retries create duplicates!
Solution: Idempotency - Make operations safe to retry!
Idempotent operation = Same result when executed multiple times.
f(f(x)) = f(x)
Calling the function twice = calling once.
✅ Idempotent Operations:
GET /users/123 - Always returns same userPUT /users/123 - Replaces user (same result)DELETE /users/123 - Deletes user (same result if already deleted)SET balance = 100 - Sets to 100 (same result)❌ Non-Idempotent Operations:
POST /orders - Creates new order each timebalance = balance + 50 - Adds 50 each timesend_email() - Sends email each timeClient provides unique key. Server checks if seen before.
How it works:
from typing import Optional, Dict, Anyimport hashlibimport jsonfrom datetime import datetime, timedeltafrom functools import wraps
class IdempotencyStore: """Stores idempotency keys and results"""
def __init__(self): self.store: Dict[str, Dict[str, Any]] = {}
def get(self, key: str) -> Optional[Dict[str, Any]]: """Get cached result for key""" entry = self.store.get(key)
if entry: # Check if expired if datetime.now() > entry['expires_at']: del self.store[key] return None
return entry['result']
return None
def set(self, key: str, result: Dict[str, Any], ttl_seconds: int = 3600): """Store result with key""" self.store[key] = { 'result': result, 'expires_at': datetime.now() + timedelta(seconds=ttl_seconds) }
class IdempotentHandler: """Handles idempotent operations"""
def __init__(self, idempotency_store: IdempotencyStore): self.store = idempotency_store
def handle_request(self, idempotency_key: str, operation: callable, *args, **kwargs): """Handle request with idempotency""" # Check if key exists cached_result = self.store.get(idempotency_key)
if cached_result: print(f"Idempotency key {idempotency_key} seen before. Returning cached result.") return cached_result
# Execute operation try: result = operation(*args, **kwargs)
# Store result self.store.set(idempotency_key, result)
return result except Exception as e: # Don't store failed operations raise
# Decorator for idempotent endpointsdef idempotent(idempotency_store: IdempotencyStore): """Decorator for idempotent endpoints""" def decorator(func): @wraps(func) def wrapper(request, *args, **kwargs): # Get idempotency key from header idempotency_key = request.headers.get('Idempotency-Key')
if not idempotency_key: # Generate from request content content = json.dumps(request.json or {}) idempotency_key = hashlib.sha256(content.encode()).hexdigest()
handler = IdempotentHandler(idempotency_store)
return handler.handle_request( idempotency_key, func, request, *args, **kwargs ) return wrapper return decorator
# Usagestore = IdempotencyStore()
@idempotent(store)def create_order(request): """Create order endpoint""" order_data = request.json # Create order... return {'order_id': 123, 'status': 'created'}
# Client sends request with idempotency key# POST /orders# Idempotency-Key: abc123# { "user_id": 456, "amount": 99.99 }import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.time.LocalDateTime;import java.time.temporal.ChronoUnit;
class IdempotencyStore { private final Map<String, IdempotencyEntry> store = new ConcurrentHashMap<>();
public Optional<Map<String, Object>> get(String key) { IdempotencyEntry entry = store.get(key);
if (entry != null) { // Check if expired if (LocalDateTime.now().isAfter(entry.getExpiresAt())) { store.remove(key); return Optional.empty(); }
return Optional.of(entry.getResult()); }
return Optional.empty(); }
public void set(String key, Map<String, Object> result, int ttlSeconds) { store.put(key, new IdempotencyEntry( result, LocalDateTime.now().plus(ttlSeconds, ChronoUnit.SECONDS) )); }}
class IdempotentHandler { private final IdempotencyStore store;
public Map<String, Object> handleRequest( String idempotencyKey, java.util.function.Supplier<Map<String, Object>> operation) {
// Check if key exists Optional<Map<String, Object>> cached = store.get(idempotencyKey);
if (cached.isPresent()) { System.out.println("Idempotency key " + idempotencyKey + " seen before. Returning cached result."); return cached.get(); }
// Execute operation try { Map<String, Object> result = operation.get();
// Store result store.set(idempotencyKey, result, 3600);
return result; } catch (Exception e) { // Don't store failed operations throw e; } }}
// Usage in controller@PostMapping("/orders")public ResponseEntity<Order> createOrder( @RequestHeader(value = "Idempotency-Key", required = false) String idempotencyKey, @RequestBody OrderRequest request) {
if (idempotencyKey == null) { // Generate from request idempotencyKey = generateKey(request); }
IdempotentHandler handler = new IdempotentHandler(idempotencyStore);
Order order = handler.handleRequest(idempotencyKey, () -> { // Create order return orderService.createOrder(request); });
return ResponseEntity.ok(order);}Ensures message processed exactly once.
from typing import Setimport hashlibimport jsonfrom datetime import datetime, timedelta
class MessageDeduplicator: """Deduplicates messages"""
def __init__(self, ttl_seconds: int = 3600): self.processed_messages: Set[str] = set() self.message_timestamps: Dict[str, datetime] = {} self.ttl_seconds = ttl_seconds
def generate_message_id(self, message: Dict[str, Any]) -> str: """Generate unique ID for message""" # Use message content + source content = json.dumps(message, sort_keys=True) return hashlib.sha256(content.encode()).hexdigest()
def is_duplicate(self, message: Dict[str, Any]) -> bool: """Check if message is duplicate""" message_id = self.generate_message_id(message)
# Clean expired entries self._clean_expired()
# Check if seen if message_id in self.processed_messages: return True
# Mark as processed self.processed_messages.add(message_id) self.message_timestamps[message_id] = datetime.now()
return False
def _clean_expired(self): """Remove expired entries""" now = datetime.now() expired = [ msg_id for msg_id, timestamp in self.message_timestamps.items() if (now - timestamp).total_seconds() > self.ttl_seconds ]
for msg_id in expired: self.processed_messages.discard(msg_id) del self.message_timestamps[msg_id]
class ExactlyOnceProcessor: """Processes messages exactly once"""
def __init__(self, deduplicator: MessageDeduplicator): self.deduplicator = deduplicator
def process(self, message: Dict[str, Any], handler: callable): """Process message exactly once""" # Check if duplicate if self.deduplicator.is_duplicate(message): print("Duplicate message detected. Skipping.") return None
# Process message (idempotent handler) try: result = handler(message) return result except Exception as e: # On error, remove from processed set (allow retry) message_id = self.deduplicator.generate_message_id(message) self.deduplicator.processed_messages.discard(message_id) raise
# Usagededuplicator = MessageDeduplicator(ttl_seconds=3600)processor = ExactlyOnceProcessor(deduplicator)
def handle_order_message(message): """Idempotent message handler""" order_id = message['order_id'] # Process order (idempotent operation) return process_order(order_id)
# Process messageresult = processor.process({ 'order_id': 123, 'user_id': 456, 'amount': 99.99}, handle_order_message)
# Retry same message (will be skipped)result = processor.process({ 'order_id': 123, 'user_id': 456, 'amount': 99.99}, handle_order_message) # Returns None (duplicate)import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.security.MessageDigest;import java.time.LocalDateTime;
class MessageDeduplicator { private final Set<String> processedMessages = ConcurrentHashMap.newKeySet(); private final Map<String, LocalDateTime> messageTimestamps = new ConcurrentHashMap<>(); private final int ttlSeconds;
public String generateMessageId(Map<String, Object> message) { // Generate unique ID from message content String content = message.toString(); // Simplified return sha256(content); }
public boolean isDuplicate(Map<String, Object> message) { String messageId = generateMessageId(message);
// Clean expired entries cleanExpired();
// Check if seen if (processedMessages.contains(messageId)) { return true; }
// Mark as processed processedMessages.add(messageId); messageTimestamps.put(messageId, LocalDateTime.now());
return false; }
private void cleanExpired() { LocalDateTime now = LocalDateTime.now(); messageTimestamps.entrySet().removeIf(entry -> ChronoUnit.SECONDS.between(entry.getValue(), now) > ttlSeconds ); }}
class ExactlyOnceProcessor { private final MessageDeduplicator deduplicator;
public Object process(Map<String, Object> message, java.util.function.Function<Map<String, Object>, Object> handler) { // Check if duplicate if (deduplicator.isDuplicate(message)) { System.out.println("Duplicate message detected. Skipping."); return null; }
// Process message try { return handler.apply(message); } catch (Exception e) { // On error, remove from processed set String messageId = deduplicator.generateMessageId(message); deduplicator.processedMessages.remove(messageId); throw e; } }}For multiple servers, use shared storage (Redis/Database):
import redisimport jsonimport hashlib
class DistributedIdempotencyStore: """Distributed idempotency store using Redis"""
def __init__(self, redis_client: redis.Redis): self.redis = redis_client
def check_and_set(self, key: str, operation: callable, ttl_seconds: int = 3600): """Check if key exists, execute if not""" # Try to set key (only if not exists) acquired = self.redis.set( f"idempotency:{key}", "processing", nx=True, # Only set if not exists ex=ttl_seconds )
if not acquired: # Key exists - get cached result cached = self.redis.get(f"idempotency:result:{key}") if cached: return json.loads(cached) else: # Still processing, wait or return error raise Exception("Operation in progress")
try: # Execute operation result = operation()
# Store result self.redis.setex( f"idempotency:result:{key}", ttl_seconds, json.dumps(result) )
# Mark as completed self.redis.setex( f"idempotency:{key}", ttl_seconds, "completed" )
return result except Exception as e: # On error, remove key (allow retry) self.redis.delete(f"idempotency:{key}") raise
# Usageredis_client = redis.Redis(host='localhost', port=6379)store = DistributedIdempotencyStore(redis_client)
def create_order(order_data): # Create order logic... return {'order_id': 123}
# Process with idempotencyidempotency_key = "order-123-abc"result = store.check_and_set( idempotency_key, lambda: create_order(order_data), ttl_seconds=3600)import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import com.fasterxml.jackson.databind.ObjectMapper;
class DistributedIdempotencyStore { private final JedisPool jedisPool; private final ObjectMapper objectMapper = new ObjectMapper();
public <T> T checkAndSet(String key, java.util.function.Supplier<T> operation, int ttlSeconds) throws Exception { try (Jedis jedis = jedisPool.getResource()) { // Try to set key (only if not exists) String lockKey = "idempotency:" + key; String result = jedis.set(lockKey, "processing", "NX", "EX", ttlSeconds);
if (!"OK".equals(result)) { // Key exists - get cached result String cached = jedis.get("idempotency:result:" + key); if (cached != null) { return (T) objectMapper.readValue(cached, Object.class); } else { throw new Exception("Operation in progress"); } }
try { // Execute operation T result = operation.get();
// Store result jedis.setex( "idempotency:result:" + key, ttlSeconds, objectMapper.writeValueAsString(result) );
// Mark as completed jedis.setex(lockKey, ttlSeconds, "completed");
return result; } catch (Exception e) { // On error, remove key jedis.del(lockKey); throw e; } } }}Always provide idempotency keys for mutating operations:
POST /ordersIdempotency-Key: abc123-xyz789Content-Type: application/json
{ "user_id": 456, "amount": 99.99 }Client generates unique key:
Cache results with keys:
Always check key before processing:
Design operations to be idempotent:
🔄 Idempotency is Critical
Make operations safe to retry. Essential for distributed systems, retries, at-least-once delivery.
🔑 Idempotency Keys
Use unique keys to detect duplicates. Check before processing. Cache results. Return cached on duplicate.
✅ Exactly-Once
Exactly-once requires idempotency + deduplication. Track processed messages. Use distributed storage for multiple servers.
💾 Store Results
Cache successful results with keys. Don’t cache failures. Set TTL. Check before processing.