Thread Pools & Executors
Why Thread Pools?
Section titled “Why Thread Pools?”Creating threads manually has significant overhead. Thread pools solve this by reusing threads and managing resources efficiently.
Visual: Manual Threads vs Thread Pool
Section titled “Visual: Manual Threads vs Thread Pool”Benefits of Thread Pools
Section titled “Benefits of Thread Pools”- Resource Reuse: Threads are reused, avoiding creation/destruction overhead
- Overhead Reduction: Thread lifecycle is managed efficiently
- Resource Limits: Prevents system overload with bounded thread counts
- Task Queuing: Tasks wait in queue when all threads are busy
- Better Performance: Reduced context switching and memory usage
Java ExecutorService
Section titled “Java ExecutorService”Java’s ExecutorService provides a high-level abstraction for thread pool management.
Visual: ExecutorService Architecture
Section titled “Visual: ExecutorService Architecture”classDiagram
class ExecutorService {
<<interface>>
+submit(Callable) Future
+submit(Runnable) Future
+shutdown() void
+shutdownNow() List~Runnable~
+awaitTermination(timeout) boolean
}
class ThreadPoolExecutor {
-corePoolSize: int
-maximumPoolSize: int
-keepAliveTime: long
-workQueue: BlockingQueue
+execute(Runnable) void
}
class Executors {
<<utility>>
+newFixedThreadPool(int) ExecutorService
+newCachedThreadPool() ExecutorService
+newScheduledThreadPool(int) ScheduledExecutorService
+newSingleThreadExecutor() ExecutorService
}
ExecutorService <|.. ThreadPoolExecutor
Executors --> ExecutorService : creates
FixedThreadPool
Section titled “FixedThreadPool”A FixedThreadPool has a fixed number of threads with an unbounded queue.
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class FixedThreadPoolExample { public static void main(String[] args) { // Create fixed thread pool with 4 threads ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit tasks for (int i = 0; i < 10; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }
// Shutdown executor.shutdown(); try { executor.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }}Characteristics:
- Fixed number of threads
- Unbounded queue (tasks wait if all threads busy)
- Threads live until pool shutdown
- Good for: CPU-bound tasks, predictable workloads
CachedThreadPool
Section titled “CachedThreadPool”A CachedThreadPool creates threads on demand and reuses them.
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class CachedThreadPoolExample { public static void main(String[] args) { // Creates threads as needed, reuses idle threads ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed by " + Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }
executor.shutdown(); }}Characteristics:
- Creates threads on demand
- No queue (creates new thread if none available)
- Idle threads terminated after 60 seconds
- Good for: I/O-bound tasks, short-lived tasks, unpredictable workloads
ScheduledThreadPool
Section titled “ScheduledThreadPool”A ScheduledThreadPool executes tasks after a delay or periodically.
import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Execute after delay scheduler.schedule(() -> { System.out.println("Delayed task executed"); }, 2, TimeUnit.SECONDS);
// Execute periodically scheduler.scheduleAtFixedRate(() -> { System.out.println("Periodic task executed"); }, 0, 1, TimeUnit.SECONDS);
Thread.sleep(5000); scheduler.shutdown(); }}SingleThreadExecutor
Section titled “SingleThreadExecutor”A SingleThreadExecutor ensures sequential execution (one task at a time).
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class SingleThreadExecutorExample { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor();
// Tasks execute sequentially for (int i = 0; i < 5; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " executed"); }); }
executor.shutdown(); }}ForkJoinPool: Work Stealing
Section titled “ForkJoinPool: Work Stealing”ForkJoinPool uses a work-stealing algorithm for divide-and-conquer tasks.
Visual: Work Stealing
Section titled “Visual: Work Stealing”Example: Parallel Merge Sort with ForkJoinPool
Section titled “Example: Parallel Merge Sort with ForkJoinPool”import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveAction;
public class ForkJoinMergeSort { static class MergeSortTask extends RecursiveAction { private final int[] array; private final int left; private final int right;
MergeSortTask(int[] array, int left, int right) { this.array = array; this.left = left; this.right = right; }
@Override protected void compute() { if (right - left < 10) { // Base case: sort directly insertionSort(array, left, right); } else { // Divide: split into two halves int mid = (left + right) / 2; MergeSortTask leftTask = new MergeSortTask(array, left, mid); MergeSortTask rightTask = new MergeSortTask(array, mid + 1, right);
// Fork: execute subtasks in parallel invokeAll(leftTask, rightTask);
// Conquer: merge results merge(array, left, mid, right); } }
private void insertionSort(int[] arr, int left, int right) { for (int i = left + 1; i <= right; i++) { int key = arr[i]; int j = i - 1; while (j >= left && arr[j] > key) { arr[j + 1] = arr[j]; j--; } arr[j + 1] = key; } }
private void merge(int[] arr, int left, int mid, int right) { int[] temp = new int[right - left + 1]; int i = left, j = mid + 1, k = 0;
while (i <= mid && j <= right) { if (arr[i] <= arr[j]) { temp[k++] = arr[i++]; } else { temp[k++] = arr[j++]; } }
while (i <= mid) temp[k++] = arr[i++]; while (j <= right) temp[k++] = arr[j++];
System.arraycopy(temp, 0, arr, left, temp.length); } }
public static void main(String[] args) { int[] array = {64, 34, 25, 12, 22, 11, 90, 5};
ForkJoinPool pool = ForkJoinPool.commonPool(); MergeSortTask task = new MergeSortTask(array, 0, array.length - 1); pool.invoke(task);
System.out.println("Sorted array: " + java.util.Arrays.toString(array)); }}RecursiveTask vs RecursiveAction
Section titled “RecursiveTask vs RecursiveAction”- RecursiveAction: For tasks that don’t return a value
- RecursiveTask: For tasks that return a value
import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;
public class RecursiveTaskExample { static class SumTask extends RecursiveTask<Long> { private final int[] array; private final int start; private final int end; private static final int THRESHOLD = 1000;
SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; }
@Override protected Long compute() { if (end - start < THRESHOLD) { // Base case: compute directly long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { // Divide int mid = (start + end) / 2; SumTask leftTask = new SumTask(array, start, mid); SumTask rightTask = new SumTask(array, mid, end);
// Fork and join leftTask.fork(); long rightResult = rightTask.compute(); long leftResult = leftTask.join();
return leftResult + rightResult; } } }
public static void main(String[] args) { int[] array = new int[10000]; for (int i = 0; i < array.length; i++) { array[i] = i + 1; }
ForkJoinPool pool = ForkJoinPool.commonPool(); SumTask task = new SumTask(array, 0, array.length); long result = pool.invoke(task);
System.out.println("Sum: " + result); }}Python concurrent.futures
Section titled “Python concurrent.futures”Python’s concurrent.futures provides a high-level interface for thread and process pools.
ThreadPoolExecutor
Section titled “ThreadPoolExecutor”from concurrent.futures import ThreadPoolExecutor, as_completedimport time
def process_task(task_id): """I/O-bound task""" print(f"Task {task_id} started") time.sleep(1) # Simulate I/O return f"Result from task {task_id}"
# Using context manager (recommended)with ThreadPoolExecutor(max_workers=4) as executor: # Submit tasks futures = [executor.submit(process_task, i) for i in range(10)]
# Get results as they complete for future in as_completed(futures): result = future.result() print(result)
# Or use mapwith ThreadPoolExecutor(max_workers=4) as executor: results = executor.map(process_task, range(10)) for result in results: print(result)ProcessPoolExecutor
Section titled “ProcessPoolExecutor”from concurrent.futures import ProcessPoolExecutorimport time
def cpu_intensive_task(n): """CPU-bound task""" result = sum(i * i for i in range(n)) return result
# ProcessPoolExecutor bypasses GILwith ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(8)]
results = [future.result() for future in futures] print(f"Results: {results}")submit() vs map()
Section titled “submit() vs map()”from concurrent.futures import ThreadPoolExecutor
def process_item(item): return item * 2
# submit() - returns Future objectswith ThreadPoolExecutor() as executor: futures = [executor.submit(process_item, i) for i in range(5)] results = [f.result() for f in futures] print(results) # [0, 2, 4, 6, 8]
# map() - simpler, returns iteratorwith ThreadPoolExecutor() as executor: results = list(executor.map(process_item, range(5))) print(results) # [0, 2, 4, 6, 8]Sizing Thread Pools
Section titled “Sizing Thread Pools”Choosing the right thread pool size is crucial for performance!
Visual: Thread Pool Sizing
Section titled “Visual: Thread Pool Sizing”CPU-Bound Tasks
Section titled “CPU-Bound Tasks”For CPU-bound tasks, use approximately N_cores threads.
Formula: N_threads = N_cores (or N_cores + 1)
Why? More threads than cores add overhead without benefit (context switching).
import multiprocessingfrom concurrent.futures import ProcessPoolExecutor
def cpu_task(n): return sum(i * i for i in range(n))
# Use number of CPU coresnum_cores = multiprocessing.cpu_count()print(f"CPU cores: {num_cores}")
with ProcessPoolExecutor(max_workers=num_cores) as executor: results = executor.map(cpu_task, [1000000] * 8) print(list(results))import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class CPUBoundPool { public static void main(String[] args) { int cores = Runtime.getRuntime().availableProcessors(); System.out.println("CPU cores: " + cores);
// Use number of cores ExecutorService executor = Executors.newFixedThreadPool(cores);
for (int i = 0; i < 10; i++) { executor.submit(() -> { // CPU-intensive work long sum = 0; for (int j = 0; j < 1000000; j++) { sum += j * j; } return sum; }); }
executor.shutdown(); }}I/O-Bound Tasks
Section titled “I/O-Bound Tasks”For I/O-bound tasks, use more threads to account for waiting time.
Formula: N_threads = N_cores * (1 + W/C)
- W = Wait time (I/O wait)
- C = Compute time (CPU time)
Example: 4 cores, 9s wait, 1s compute = 4 * (1 + 9/1) = 40 threads
from concurrent.futures import ThreadPoolExecutorimport requestsimport time
def fetch_url(url): """I/O-bound task""" response = requests.get(url) return response.status_code
# For I/O-bound, use more threads# Example: 4 cores, 9s wait, 1s compute = 40 threadsnum_threads = 40
with ThreadPoolExecutor(max_workers=num_threads) as executor: urls = ["https://httpbin.org/delay/1"] * 100 results = executor.map(fetch_url, urls) print(f"Processed {len(list(results))} URLs")import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class IOBoundPool { public static void main(String[] args) { int cores = Runtime.getRuntime().availableProcessors(); // For I/O-bound: more threads // Example: 4 cores * (1 + 9/1) = 40 threads int threads = cores * 10; // Simplified calculation
ExecutorService executor = Executors.newFixedThreadPool(threads);
for (int i = 0; i < 100; i++) { executor.submit(() -> { // I/O operation try { Thread.sleep(1000); // Simulate I/O wait } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); }
executor.shutdown(); }}General Formula
Section titled “General Formula”Formula: N_threads = N_cores * U_CPU * (1 + W/C)
- U_CPU = Target CPU utilization (0.0 to 1.0)
- W = Wait time
- C = Compute time
Considerations:
- System load
- Memory constraints
- Task characteristics
- Response time requirements
Shutdown Handling
Section titled “Shutdown Handling”Proper shutdown is crucial to avoid resource leaks and ensure clean termination.
Visual: Shutdown Flow
Section titled “Visual: Shutdown Flow”Example: Proper Shutdown
Section titled “Example: Proper Shutdown”import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;
public class ShutdownExample { public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit tasks for (int i = 0; i < 10; i++) { executor.submit(() -> { try { Thread.sleep(1000); System.out.println("Task completed"); } catch (InterruptedException e) { System.out.println("Task interrupted"); Thread.currentThread().interrupt(); } }); }
// Graceful shutdown executor.shutdown(); // Stop accepting new tasks
try { // Wait for tasks to complete if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { // Force shutdown if timeout executor.shutdownNow(); if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { System.err.println("Pool did not terminate"); } } } catch (InterruptedException e) { executor.shutdownNow(); Thread.currentThread().interrupt(); } }}from concurrent.futures import ThreadPoolExecutorimport time
def task(n): time.sleep(1) return n * 2
# Context manager handles shutdown automaticallywith ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(task, i) for i in range(10)] results = [f.result() for f in futures] print(results)
# Manual shutdownexecutor = ThreadPoolExecutor(max_workers=4)futures = [executor.submit(task, i) for i in range(10)]
executor.shutdown(wait=True) # Wait for all tasksresults = [f.result() for f in futures]Future and Callable
Section titled “Future and Callable”Future objects represent the result of an asynchronous computation.
Visual: Future Pattern
Section titled “Visual: Future Pattern”Example: Using Future
Section titled “Example: Using Future”import java.util.concurrent.*;
public class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit Callable (returns value) Future<Integer> future = executor.submit(() -> { Thread.sleep(2000); return 42; });
// Do other work System.out.println("Doing other work...");
// Get result (blocks until ready) Integer result = future.get(); System.out.println("Result: " + result);
executor.shutdown(); }}from concurrent.futures import ThreadPoolExecutorimport time
def compute_value(n): time.sleep(2) return n * 2
with ThreadPoolExecutor() as executor: # Submit returns Future immediately future = executor.submit(compute_value, 21)
# Do other work print("Doing other work...")
# Get result (blocks until ready) result = future.result() print(f"Result: {result}")Practice Problems
Section titled “Practice Problems”Easy: Process Tasks with Thread Pool
Section titled “Easy: Process Tasks with Thread Pool”Use a thread pool to process a list of tasks concurrently.
Solution
from concurrent.futures import ThreadPoolExecutor
def process_task(item): return item * 2
items = list(range(10))
with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_task, items)) print(results)import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.stream.IntStream;
ExecutorService executor = Executors.newFixedThreadPool(4);IntStream.range(0, 10) .forEach(i -> executor.submit(() -> System.out.println(i * 2)));executor.shutdown();Interview Questions
Section titled “Interview Questions”Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?”
Section titled “Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?””Answer:
- CPU-bound:
N_threads = N_cores(more threads waste resources) - I/O-bound:
N_threads = N_cores * (1 + W/C)where W=wait time, C=compute time - Reason: I/O-bound tasks spend time waiting, so more threads can be utilized
Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?”
Section titled “Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?””Answer:
- FixedThreadPool: Fixed threads, unbounded queue, threads live until shutdown
- CachedThreadPool: Creates threads on demand, no queue, idle threads terminated after 60s
- Use FixedThreadPool: Predictable workloads, CPU-bound tasks
- Use CachedThreadPool: Short-lived tasks, I/O-bound, unpredictable workloads
Q3: “How does work stealing work in ForkJoinPool?”
Section titled “Q3: “How does work stealing work in ForkJoinPool?””Answer:
- Each thread has its own deque (double-ended queue)
- Threads push/pop from their own deque (LIFO)
- When a thread’s deque is empty, it steals from the tail of another thread’s deque
- Benefits: Better load balancing, higher CPU utilization
- Ideal for: Divide-and-conquer algorithms, recursive tasks
Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?”
Section titled “Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?””Answer:
- ProcessPoolExecutor: CPU-bound tasks (bypasses GIL), true parallelism
- ThreadPoolExecutor: I/O-bound tasks (GIL released during I/O), lower overhead
- Choose based on: Task type (CPU vs I/O), not just performance
Key Takeaways
Section titled “Key Takeaways”Next Steps
Section titled “Next Steps”Continue learning concurrency:
- Concurrent Collections - Thread-safe data structures
- Asynchronous Patterns - Futures and async/await
Mastering thread pools is essential for building efficient concurrent systems! ⚙️