Skip to content
Low Level Design Mastery Logo
LowLevelDesign Mastery

Thread Pools & Executors

Efficiently manage concurrent tasks with thread pools.

Creating threads manually has significant overhead. Thread pools solve this by reusing threads and managing resources efficiently.

Diagram
  1. Resource Reuse: Threads are reused, avoiding creation/destruction overhead
  2. Overhead Reduction: Thread lifecycle is managed efficiently
  3. Resource Limits: Prevents system overload with bounded thread counts
  4. Task Queuing: Tasks wait in queue when all threads are busy
  5. Better Performance: Reduced context switching and memory usage

Java’s ExecutorService provides a high-level abstraction for thread pool management.

classDiagram
    class ExecutorService {
        <<interface>>
        +submit(Callable) Future
        +submit(Runnable) Future
        +shutdown() void
        +shutdownNow() List~Runnable~
        +awaitTermination(timeout) boolean
    }
    
    class ThreadPoolExecutor {
        -corePoolSize: int
        -maximumPoolSize: int
        -keepAliveTime: long
        -workQueue: BlockingQueue
        +execute(Runnable) void
    }
    
    class Executors {
        <<utility>>
        +newFixedThreadPool(int) ExecutorService
        +newCachedThreadPool() ExecutorService
        +newScheduledThreadPool(int) ScheduledExecutorService
        +newSingleThreadExecutor() ExecutorService
    }
    
    ExecutorService <|.. ThreadPoolExecutor
    Executors --> ExecutorService : creates

A FixedThreadPool has a fixed number of threads with an unbounded queue.

FixedThreadPoolExample.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolExample {
public static void main(String[] args) {
// Create fixed thread pool with 4 threads
ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit tasks
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId +
" executed by " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Shutdown
executor.shutdown();
try {
executor.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Characteristics:

  • Fixed number of threads
  • Unbounded queue (tasks wait if all threads busy)
  • Threads live until pool shutdown
  • Good for: CPU-bound tasks, predictable workloads

A CachedThreadPool creates threads on demand and reuses them.

CachedThreadPoolExample.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolExample {
public static void main(String[] args) {
// Creates threads as needed, reuses idle threads
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId +
" executed by " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}

Characteristics:

  • Creates threads on demand
  • No queue (creates new thread if none available)
  • Idle threads terminated after 60 seconds
  • Good for: I/O-bound tasks, short-lived tasks, unpredictable workloads

A ScheduledThreadPool executes tasks after a delay or periodically.

ScheduledThreadPoolExample.java
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// Execute after delay
scheduler.schedule(() -> {
System.out.println("Delayed task executed");
}, 2, TimeUnit.SECONDS);
// Execute periodically
scheduler.scheduleAtFixedRate(() -> {
System.out.println("Periodic task executed");
}, 0, 1, TimeUnit.SECONDS);
Thread.sleep(5000);
scheduler.shutdown();
}
}

A SingleThreadExecutor ensures sequential execution (one task at a time).

SingleThreadExecutorExample.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// Tasks execute sequentially
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executed");
});
}
executor.shutdown();
}
}

ForkJoinPool uses a work-stealing algorithm for divide-and-conquer tasks.

Diagram

Example: Parallel Merge Sort with ForkJoinPool

Section titled “Example: Parallel Merge Sort with ForkJoinPool”
ForkJoinMergeSort.java
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ForkJoinMergeSort {
static class MergeSortTask extends RecursiveAction {
private final int[] array;
private final int left;
private final int right;
MergeSortTask(int[] array, int left, int right) {
this.array = array;
this.left = left;
this.right = right;
}
@Override
protected void compute() {
if (right - left < 10) {
// Base case: sort directly
insertionSort(array, left, right);
} else {
// Divide: split into two halves
int mid = (left + right) / 2;
MergeSortTask leftTask = new MergeSortTask(array, left, mid);
MergeSortTask rightTask = new MergeSortTask(array, mid + 1, right);
// Fork: execute subtasks in parallel
invokeAll(leftTask, rightTask);
// Conquer: merge results
merge(array, left, mid, right);
}
}
private void insertionSort(int[] arr, int left, int right) {
for (int i = left + 1; i <= right; i++) {
int key = arr[i];
int j = i - 1;
while (j >= left && arr[j] > key) {
arr[j + 1] = arr[j];
j--;
}
arr[j + 1] = key;
}
}
private void merge(int[] arr, int left, int mid, int right) {
int[] temp = new int[right - left + 1];
int i = left, j = mid + 1, k = 0;
while (i <= mid && j <= right) {
if (arr[i] <= arr[j]) {
temp[k++] = arr[i++];
} else {
temp[k++] = arr[j++];
}
}
while (i <= mid) temp[k++] = arr[i++];
while (j <= right) temp[k++] = arr[j++];
System.arraycopy(temp, 0, arr, left, temp.length);
}
}
public static void main(String[] args) {
int[] array = {64, 34, 25, 12, 22, 11, 90, 5};
ForkJoinPool pool = ForkJoinPool.commonPool();
MergeSortTask task = new MergeSortTask(array, 0, array.length - 1);
pool.invoke(task);
System.out.println("Sorted array: " + java.util.Arrays.toString(array));
}
}
  • RecursiveAction: For tasks that don’t return a value
  • RecursiveTask: For tasks that return a value
RecursiveTaskExample.java
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class RecursiveTaskExample {
static class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 1000;
SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start < THRESHOLD) {
// Base case: compute directly
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// Divide
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// Fork and join
leftTask.fork();
long rightResult = rightTask.compute();
long leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
public static void main(String[] args) {
int[] array = new int[10000];
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
ForkJoinPool pool = ForkJoinPool.commonPool();
SumTask task = new SumTask(array, 0, array.length);
long result = pool.invoke(task);
System.out.println("Sum: " + result);
}
}

Python’s concurrent.futures provides a high-level interface for thread and process pools.

thread_pool_executor.py
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def process_task(task_id):
"""I/O-bound task"""
print(f"Task {task_id} started")
time.sleep(1) # Simulate I/O
return f"Result from task {task_id}"
# Using context manager (recommended)
with ThreadPoolExecutor(max_workers=4) as executor:
# Submit tasks
futures = [executor.submit(process_task, i) for i in range(10)]
# Get results as they complete
for future in as_completed(futures):
result = future.result()
print(result)
# Or use map
with ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(process_task, range(10))
for result in results:
print(result)
process_pool_executor.py
from concurrent.futures import ProcessPoolExecutor
import time
def cpu_intensive_task(n):
"""CPU-bound task"""
result = sum(i * i for i in range(n))
return result
# ProcessPoolExecutor bypasses GIL
with ProcessPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cpu_intensive_task, 1000000)
for _ in range(8)]
results = [future.result() for future in futures]
print(f"Results: {results}")
submit_vs_map.py
from concurrent.futures import ThreadPoolExecutor
def process_item(item):
return item * 2
# submit() - returns Future objects
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_item, i) for i in range(5)]
results = [f.result() for f in futures]
print(results) # [0, 2, 4, 6, 8]
# map() - simpler, returns iterator
with ThreadPoolExecutor() as executor:
results = list(executor.map(process_item, range(5)))
print(results) # [0, 2, 4, 6, 8]

Choosing the right thread pool size is crucial for performance!

Diagram

For CPU-bound tasks, use approximately N_cores threads.

Formula: N_threads = N_cores (or N_cores + 1)

Why? More threads than cores add overhead without benefit (context switching).

cpu_bound_pool.py
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
def cpu_task(n):
return sum(i * i for i in range(n))
# Use number of CPU cores
num_cores = multiprocessing.cpu_count()
print(f"CPU cores: {num_cores}")
with ProcessPoolExecutor(max_workers=num_cores) as executor:
results = executor.map(cpu_task, [1000000] * 8)
print(list(results))

For I/O-bound tasks, use more threads to account for waiting time.

Formula: N_threads = N_cores * (1 + W/C)

  • W = Wait time (I/O wait)
  • C = Compute time (CPU time)

Example: 4 cores, 9s wait, 1s compute = 4 * (1 + 9/1) = 40 threads

io_bound_pool.py
from concurrent.futures import ThreadPoolExecutor
import requests
import time
def fetch_url(url):
"""I/O-bound task"""
response = requests.get(url)
return response.status_code
# For I/O-bound, use more threads
# Example: 4 cores, 9s wait, 1s compute = 40 threads
num_threads = 40
with ThreadPoolExecutor(max_workers=num_threads) as executor:
urls = ["https://httpbin.org/delay/1"] * 100
results = executor.map(fetch_url, urls)
print(f"Processed {len(list(results))} URLs")

Formula: N_threads = N_cores * U_CPU * (1 + W/C)

  • U_CPU = Target CPU utilization (0.0 to 1.0)
  • W = Wait time
  • C = Compute time

Considerations:

  • System load
  • Memory constraints
  • Task characteristics
  • Response time requirements

Proper shutdown is crucial to avoid resource leaks and ensure clean termination.

Diagram
ShutdownExample.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ShutdownExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit tasks
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
try {
Thread.sleep(1000);
System.out.println("Task completed");
} catch (InterruptedException e) {
System.out.println("Task interrupted");
Thread.currentThread().interrupt();
}
});
}
// Graceful shutdown
executor.shutdown(); // Stop accepting new tasks
try {
// Wait for tasks to complete
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// Force shutdown if timeout
executor.shutdownNow();
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

Future objects represent the result of an asynchronous computation.

Diagram
FutureExample.java
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
// Submit Callable (returns value)
Future<Integer> future = executor.submit(() -> {
Thread.sleep(2000);
return 42;
});
// Do other work
System.out.println("Doing other work...");
// Get result (blocks until ready)
Integer result = future.get();
System.out.println("Result: " + result);
executor.shutdown();
}
}

Use a thread pool to process a list of tasks concurrently.

Solution
from concurrent.futures import ThreadPoolExecutor
def process_task(item):
return item * 2
items = list(range(10))
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_task, items))
print(results)

Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?”

Section titled “Q1: “How would you size a thread pool for CPU-bound vs I/O-bound tasks?””

Answer:

  • CPU-bound: N_threads = N_cores (more threads waste resources)
  • I/O-bound: N_threads = N_cores * (1 + W/C) where W=wait time, C=compute time
  • Reason: I/O-bound tasks spend time waiting, so more threads can be utilized

Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?”

Section titled “Q2: “What’s the difference between FixedThreadPool and CachedThreadPool?””

Answer:

  • FixedThreadPool: Fixed threads, unbounded queue, threads live until shutdown
  • CachedThreadPool: Creates threads on demand, no queue, idle threads terminated after 60s
  • Use FixedThreadPool: Predictable workloads, CPU-bound tasks
  • Use CachedThreadPool: Short-lived tasks, I/O-bound, unpredictable workloads

Q3: “How does work stealing work in ForkJoinPool?”

Section titled “Q3: “How does work stealing work in ForkJoinPool?””

Answer:

  • Each thread has its own deque (double-ended queue)
  • Threads push/pop from their own deque (LIFO)
  • When a thread’s deque is empty, it steals from the tail of another thread’s deque
  • Benefits: Better load balancing, higher CPU utilization
  • Ideal for: Divide-and-conquer algorithms, recursive tasks

Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?”

Section titled “Q4: “When would you use ProcessPoolExecutor vs ThreadPoolExecutor in Python?””

Answer:

  • ProcessPoolExecutor: CPU-bound tasks (bypasses GIL), true parallelism
  • ThreadPoolExecutor: I/O-bound tasks (GIL released during I/O), lower overhead
  • Choose based on: Task type (CPU vs I/O), not just performance


Continue learning concurrency:

Mastering thread pools is essential for building efficient concurrent systems! ⚙️