🔄 WebSocket: Full-Duplex
WebSocket provides bidirectional, persistent connection. Best for chat, gaming, real-time collaboration.
Traditional HTTP: Client requests, server responds. Client must poll for updates.
Problems:
Solution: Real-time communication - Server pushes updates to client!
Bidirectional, persistent connection.
Characteristics:
Server-to-client streaming over HTTP.
Characteristics:
Hold request open until data available.
Characteristics:
1. Handshake (HTTP Upgrade):
GET /ws HTTP/1.1Host: example.comUpgrade: websocketConnection: UpgradeSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==Sec-WebSocket-Version: 13Server responds:
HTTP/1.1 101 Switching ProtocolsUpgrade: websocketConnection: UpgradeSec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=2. Connection Established - Now bidirectional!
import asyncioimport websocketsimport json
class WebSocketServer: def __init__(self): self.clients = set()
async def register_client(self, websocket): """Register new client""" self.clients.add(websocket) print(f"Client connected. Total: {len(self.clients)}")
async def unregister_client(self, websocket): """Unregister client""" self.clients.discard(websocket) print(f"Client disconnected. Total: {len(self.clients)}")
async def handle_client(self, websocket, path): """Handle client connection""" await self.register_client(websocket)
try: async for message in websocket: # Handle incoming message data = json.loads(message) await self.process_message(websocket, data)
except websockets.exceptions.ConnectionClosed: pass finally: await self.unregister_client(websocket)
async def process_message(self, websocket, data): """Process client message""" message_type = data.get('type')
if message_type == 'chat': # Broadcast to all clients await self.broadcast({ 'type': 'chat', 'user': data.get('user'), 'message': data.get('message'), 'timestamp': data.get('timestamp') })
elif message_type == 'ping': # Respond to ping await websocket.send(json.dumps({'type': 'pong'}))
async def broadcast(self, message): """Broadcast message to all clients""" if self.clients: message_json = json.dumps(message) await asyncio.gather( *[client.send(message_json) for client in self.clients], return_exceptions=True )
async def send_to_client(self, websocket, message): """Send message to specific client""" try: await websocket.send(json.dumps(message)) except websockets.exceptions.ConnectionClosed: await self.unregister_client(websocket)
# Start serverserver = WebSocketServer()start_server = websockets.serve( server.handle_client, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)asyncio.get_event_loop().run_forever()import org.java_websocket.WebSocket;import org.java_websocket.handshake.ClientHandshake;import org.java_websocket.server.WebSocketServer;import java.net.InetSocketAddress;import java.util.Collections;import java.util.Set;import java.util.concurrent.CopyOnWriteArraySet;
public class ChatServer extends WebSocketServer { private final Set<WebSocket> clients = new CopyOnWriteArraySet<>();
public ChatServer(int port) { super(new InetSocketAddress(port)); }
@Override public void onOpen(WebSocket conn, ClientHandshake handshake) { // Register new client clients.add(conn); System.out.println("Client connected. Total: " + clients.size()); }
@Override public void onClose(WebSocket conn, int code, String reason, boolean remote) { // Unregister client clients.remove(conn); System.out.println("Client disconnected. Total: " + clients.size()); }
@Override public void onMessage(WebSocket conn, String message) { // Handle incoming message try { JSONObject data = new JSONObject(message); String type = data.getString("type");
if ("chat".equals(type)) { // Broadcast to all clients broadcast(message); } else if ("ping".equals(type)) { // Respond to ping conn.send("{\"type\":\"pong\"}"); } } catch (JSONException e) { e.printStackTrace(); } }
@Override public void onError(WebSocket conn, Exception ex) { ex.printStackTrace(); }
public void broadcast(String message) { // Broadcast to all clients for (WebSocket client : clients) { client.send(message); } }
public static void main(String[] args) { ChatServer server = new ChatServer(8765); server.start(); }}class WebSocketClient { constructor(url) { this.url = url; this.ws = null; this.reconnectInterval = 1000; this.maxReconnectAttempts = 5; this.reconnectAttempts = 0; }
connect() { this.ws = new WebSocket(this.url);
this.ws.onopen = () => { console.log('WebSocket connected'); this.reconnectAttempts = 0; this.onOpen(); };
this.ws.onmessage = (event) => { const data = JSON.parse(event.data); this.onMessage(data); };
this.ws.onerror = (error) => { console.error('WebSocket error:', error); this.onError(error); };
this.ws.onclose = () => { console.log('WebSocket closed'); this.onClose(); this.reconnect(); }; }
send(message) { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); } else { console.error('WebSocket not connected'); } }
reconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; setTimeout(() => { console.log(`Reconnecting... (${this.reconnectAttempts}/${this.maxReconnectAttempts})`); this.connect(); }, this.reconnectInterval * this.reconnectAttempts); } }
onOpen() { // Override in subclass }
onMessage(data) { // Override in subclass }
onError(error) { // Override in subclass }
onClose() { // Override in subclass }
close() { if (this.ws) { this.ws.close(); } }}
// Usageconst client = new WebSocketClient('ws://localhost:8765');
client.onMessage = (data) => { if (data.type === 'chat') { console.log(`${data.user}: ${data.message}`); }};
client.connect();
// Send messageclient.send({ type: 'chat', user: 'John', message: 'Hello!', timestamp: Date.now()});Client opens HTTP connection, server streams events:
GET /events HTTP/1.1Host: example.comAccept: text/event-streamCache-Control: no-cacheServer responds with stream:
HTTP/1.1 200 OKContent-Type: text/event-streamCache-Control: no-cacheConnection: keep-alive
event: messagedata: Hello World
event: updatedata: {"user": "John", "status": "online"}
event: messagedata: Goodbyefrom flask import Flask, Response, jsonifyimport jsonimport timeimport threading
app = Flask(__name__)
@app.route('/events')def stream_events(): """SSE endpoint""" def event_stream(): while True: # Send event data = { 'timestamp': time.time(), 'message': 'Server update' } yield f"event: update\ndata: {json.dumps(data)}\n\n"
time.sleep(1) # Send every second
return Response( event_stream(), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no' # Disable buffering in nginx } )
@app.route('/notify')def notify(): """Trigger notification""" # In real app, this would trigger event return jsonify({'status': 'notification sent'})
if __name__ == '__main__': app.run(threaded=True)import org.springframework.http.MediaType;import org.springframework.web.bind.annotation.*;import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
@RestControllerpublic class SSEServer { private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter streamEvents() { SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
executor.scheduleAtFixedRate(() -> { try { SseEmitter.SseEventBuilder event = SseEmitter.event() .name("update") .data(Map.of("timestamp", System.currentTimeMillis(), "message", "Server update"));
emitter.send(event); } catch (IOException e) { emitter.completeWithError(e); } }, 0, 1, TimeUnit.SECONDS);
emitter.onCompletion(() -> executor.shutdown()); emitter.onTimeout(() -> executor.shutdown());
return emitter; }}class SSEClient { constructor(url) { this.url = url; this.eventSource = null; }
connect() { this.eventSource = new EventSource(this.url);
this.eventSource.onopen = () => { console.log('SSE connected'); this.onOpen(); };
this.eventSource.onmessage = (event) => { const data = JSON.parse(event.data); this.onMessage(data); };
// Listen for specific event types this.eventSource.addEventListener('update', (event) => { const data = JSON.parse(event.data); this.onUpdate(data); });
this.eventSource.onerror = (error) => { console.error('SSE error:', error); this.onError(error); }; }
onOpen() { // Override in subclass }
onMessage(data) { // Override in subclass }
onUpdate(data) { // Override in subclass }
onError(error) { // Override in subclass }
close() { if (this.eventSource) { this.eventSource.close(); } }}
// Usageconst client = new SSEClient('/events');
client.onUpdate = (data) => { console.log('Update:', data);};
client.connect();Client sends request, server holds it open:
from flask import Flask, jsonify, requestimport timeimport threading
app = Flask(__name__)pending_requests = []
@app.route('/poll')def poll(): """Long polling endpoint""" timeout = int(request.args.get('timeout', 30)) last_id = int(request.args.get('last_id', 0))
# Check for new data new_data = get_data_since(last_id)
if new_data: return jsonify(new_data)
# No data - hold request event = threading.Event() pending_requests.append({ 'event': event, 'last_id': last_id, 'timeout': timeout })
# Wait for data or timeout if event.wait(timeout): new_data = get_data_since(last_id) return jsonify(new_data) else: return jsonify({'status': 'timeout'}), 200
@app.route('/notify')def notify(): """Trigger notification""" # Wake up pending requests for req in pending_requests: req['event'].set()
pending_requests.clear() return jsonify({'status': 'notified'})import org.springframework.web.bind.annotation.*;import org.springframework.http.ResponseEntity;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;
@RestControllerpublic class LongPollingServer {
@GetMapping("/poll") public CompletableFuture<ResponseEntity<Map<String, Object>>> poll( @RequestParam(defaultValue = "0") int lastId, @RequestParam(defaultValue = "30") int timeout) {
// Check for new data List<Data> newData = getDataSince(lastId);
if (!newData.isEmpty()) { return CompletableFuture.completedFuture( ResponseEntity.ok(Map.of("data", newData)) ); }
// No data - wait for new data or timeout return CompletableFuture.supplyAsync(() -> { try { // Wait for data or timeout Thread.sleep(timeout * 1000);
newData = getDataSince(lastId); if (!newData.isEmpty()) { return ResponseEntity.ok(Map.of("data", newData)); } else { return ResponseEntity.ok(Map.of("status", "timeout")); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return ResponseEntity.ok(Map.of("status", "interrupted")); } }); }}| Pattern | Use When | Don’t Use When |
|---|---|---|
| WebSocket | Bidirectional needed, low latency, high frequency | Simple one-way updates, HTTP-only environments |
| SSE | Server-to-client only, simple implementation, HTTP-based | Bidirectional needed, client-to-server messages |
| Long Polling | WebSocket/SSE not available, simple use case | High frequency, low latency needed |
Need bidirectional? Yes → WebSocket No → Need low latency? Yes → SSE No → Long Polling (fallback)Managing connections is critical:
from typing import Dict, Setimport asynciofrom datetime import datetime, timedelta
class ConnectionManager: """Manages WebSocket connections"""
def __init__(self): self.connections: Dict[str, Set] = {} self.heartbeat_interval = 30 # seconds self.connection_timeout = 60 # seconds
async def add_connection(self, connection_id: str, websocket): """Add new connection""" if connection_id not in self.connections: self.connections[connection_id] = set()
self.connections[connection_id].add(websocket)
# Start heartbeat asyncio.create_task(self.heartbeat(websocket))
async def remove_connection(self, connection_id: str, websocket): """Remove connection""" if connection_id in self.connections: self.connections[connection_id].discard(websocket) if not self.connections[connection_id]: del self.connections[connection_id]
async def heartbeat(self, websocket): """Send heartbeat to keep connection alive""" try: while True: await asyncio.sleep(self.heartbeat_interval) await websocket.send(json.dumps({'type': 'ping'})) except: await self.remove_connection('unknown', websocket)
async def broadcast(self, message, connection_id: str = None): """Broadcast message""" targets = self.connections.get(connection_id, set()) if connection_id else set().union(*self.connections.values())
disconnected = set() for ws in targets: try: await ws.send(json.dumps(message)) except: disconnected.add(ws)
# Clean up disconnected for ws in disconnected: await self.remove_connection('unknown', ws)import java.util.concurrent.ConcurrentHashMap;import java.util.Set;import java.util.concurrent.CopyOnWriteArraySet;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;
public class ConnectionManager { private final Map<String, Set<WebSocket>> connections = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); private final int heartbeatInterval = 30; // seconds
public void addConnection(String connectionId, WebSocket websocket) { connections.computeIfAbsent(connectionId, k -> new CopyOnWriteArraySet<>()) .add(websocket);
// Start heartbeat scheduler.scheduleAtFixedRate(() -> { try { websocket.send("{\"type\":\"ping\"}"); } catch (Exception e) { removeConnection(connectionId, websocket); } }, heartbeatInterval, heartbeatInterval, TimeUnit.SECONDS); }
public void removeConnection(String connectionId, WebSocket websocket) { Set<WebSocket> conns = connections.get(connectionId); if (conns != null) { conns.remove(websocket); if (conns.isEmpty()) { connections.remove(connectionId); } } }
public void broadcast(String message, String connectionId) { Set<WebSocket> targets = connectionId != null ? connections.getOrDefault(connectionId, Collections.emptySet()) : connections.values().stream() .flatMap(Set::stream) .collect(Collectors.toSet());
for (WebSocket ws : targets) { try { ws.send(message); } catch (Exception e) { removeConnection(connectionId, ws); } } }}🔄 WebSocket: Full-Duplex
WebSocket provides bidirectional, persistent connection. Best for chat, gaming, real-time collaboration.
📡 SSE: Simple Streaming
SSE is HTTP-based, server-to-client streaming. Simpler than WebSocket, automatic reconnection.
⏳ Long Polling: Fallback
Long polling holds requests open. Use when WebSocket/SSE not available. Less efficient but works everywhere.
🔌 Connection Management
Manage connections carefully: heartbeat, cleanup, reconnection. Critical for production systems.