Worker Pool Architecture

Cloudillo’s Worker Pool provides a three-tier priority thread pool for executing CPU-intensive and blocking operations. This system complements the async runtime by handling work that shouldn’t block async tasks, ensuring responsive performance even under heavy computational load.

Why a Separate Worker Pool?

The async runtime (Tokio) is optimized for I/O-bound tasks, not CPU-bound work:

Problems with CPU Work on Async Runtime:

  • Blocks other tasks: Heavy CPU work prevents other async tasks from making progress
  • Thread starvation: Can exhaust async thread pool
  • Poor latency: User-facing requests wait for CPU work to complete
  • No prioritization: Can’t prioritize urgent CPU work over background processing

Worker Pool Solutions:

  • Dedicated threads: Separate from async runtime
  • Priority-based: Three tiers (High/Medium/Low) for different urgency levels
  • Work stealing: Efficient load balancing across workers
  • Future-based API: Async-friendly interface
  • Backpressure: Prevents overwhelming the system

Architecture Overview

Three-Tier Priority System

┌───────────────────────────────────────────┐
│ High Priority Queue (1 dedicated thread)  │
│  - User-facing operations                 │
│  - Time-sensitive tasks                   │
│  - Cryptographic signing                  │
└───────────────────────────────────────────┘
                ↓
┌───────────────────────────────────────────┐
│ Medium Priority Queue (2 threads)         │
│  - Processes: High + Medium               │
│  - Image resizing for posts               │
│  - File compression                       │
│  - Normal background work                 │
└───────────────────────────────────────────┘
                ↓
┌───────────────────────────────────────────┐
│ Low Priority Queue (1 thread)             │
│  - Processes: High + Medium + Low         │
│  - Batch operations                       │
│  - Cleanup tasks                          │
│  - Non-urgent processing                  │
└───────────────────────────────────────────┘

Default Configuration:

  • High priority: 1 dedicated thread
  • Medium priority: 2 threads (also process High)
  • Low priority: 1 thread (also processes High + Medium)
  • Total threads: 4

Priority Cascading

Higher priority workers process lower priority work when idle:

Thread 1 (High):     High tasks only
Threads 2-3 (Med):   High → Medium (if no High)
Thread 4 (Low):      High → Medium → Low (if none above)

Benefits:

  • High-priority work always has dedicated capacity
  • Lower-priority work still gets done when system is idle
  • Automatic load balancing

Core Components

WorkerPool Structure

pub struct WorkerPool {
    high_tx: flume::Sender<Job>,
    medium_tx: flume::Sender<Job>,
    low_tx: flume::Sender<Job>,
    handles: Vec<JoinHandle<()>>,
}

type Job = Box<dyn FnOnce() -> ClResult<Box<dyn Any + Send>> + Send>;

Initialization

Algorithm: Initialize Worker Pool

1. Create three unbounded message channels (high, medium, low)
2. Create thread handles vector
3. For each high-priority worker:
   - Clone high-priority receiver
   - Spawn worker thread (processes high priority only)
   - Store thread handle
4. For each medium-priority worker:
   - Clone high and medium receivers
   - Spawn cascading worker (processes high first, then medium)
   - Store thread handle
5. For each low-priority worker:
   - Clone all three receivers
   - Spawn cascading worker (processes high → medium → low)
   - Store thread handle
6. Return WorkerPool struct with senders and handles

Worker Thread

Basic Worker Spawning:

Algorithm: Spawn Single-Priority Worker

1. Spawn OS thread with receiver
2. Loop:
   a. Receive job from channel (blocking)
   b. Execute job
   c. Log success or error
   d. If channel disconnected, exit thread

Cascading Worker Spawning:

Algorithm: Spawn Multi-Priority Cascading Worker

1. Spawn OS thread with multiple receivers (high → medium → low)
2. Continuous loop:
   a. For each receiver in priority order:
      - Try to receive job (non-blocking)
      - If job available: execute, log result, restart loop
      - If empty: try next priority
      - If disconnected: exit thread
   b. If all queues empty: sleep 10ms and retry

This ensures high priority work is processed first, then medium, then low.

API Usage

Execute Method

Algorithm: Execute Work on Worker Pool

Input: priority level, CPU-bound function F
Output: Result<T>

1. Create async oneshot channel (tx, rx)
2. Wrap function F:
   - Execute F on worker thread
   - Send result through tx
   - Return success boxed as Any
3. Send wrapped job to appropriate priority queue:
   - High priority → high_tx
   - Medium priority → medium_tx
   - Low priority → low_tx
4. Await result through rx (async, doesn't block executor)
5. Return result to caller

This pattern allows async code to offload CPU work without blocking the async runtime.

Priority Enum

pub enum Priority {
    High,    // User-facing, time-sensitive
    Medium,  // Normal operations
    Low,     // Background, non-urgent
}

When to Use Each Priority

High Priority (Dedicated Resources)

Use for:

  • ✅ Cryptographic operations during login
  • ✅ Image processing for profile picture uploads (user waiting)
  • ✅ Real-time compression/decompression
  • ✅ Time-sensitive computations

Characteristics:

  • User is actively waiting
  • Quick feedback required (<1 second)
  • Affects UX directly
  • Should be fast operations (<100ms typical)

Example Pattern:

// User uploading profile picture (waiting)
compressed = await worker.execute(Priority::High, || {
    compress_image(image_data, quality)
})

Guidelines:

  • Target volume: <100 jobs/second
  • Typical duration: <100ms
  • When not to use: Background processing, batch operations

Medium Priority (Default)

Use for:

  • ✅ Image processing for posts (async to user)
  • ✅ File compression for uploads
  • ✅ Data transformations
  • ✅ Most CPU-intensive work

Characteristics:

  • User submitted request but isn’t waiting
  • Processing happens in background
  • Results needed “soon” (seconds to minutes)
  • Default choice for most CPU work

Example Pattern:

// Generating image variants for a post
thumbnail = await worker.execute(Priority::Medium, || {
    resize_image(image_data, 200, 200)
})

Guidelines:

  • Target volume: <1000 jobs/second
  • Typical duration: 100ms - 10 seconds
  • When not to use: User actively waiting, or truly non-urgent

Low Priority (Background)

Use for:

  • ✅ Batch processing
  • ✅ Cleanup operations
  • ✅ Pre-generation of assets
  • ✅ Non-urgent optimization tasks

Characteristics:

  • No user waiting
  • Can take as long as needed
  • Won’t impact user-facing operations
  • Can be delayed indefinitely

Example Pattern:

// Batch thumbnail generation
for image in large_image_set:
    await worker.execute(Priority::Low, || {
        generate_all_variants(image)
    })

Guidelines:

  • Target volume: Variable (throttle if needed)
  • Typical duration: Seconds to minutes
  • When not to use: Anything time-sensitive

Integration Patterns

With Task Scheduler

Tasks often use the worker pool for CPU-intensive work following this pattern:

Algorithm: Task with Worker Pool Integration

1. Async I/O: Load data from blob storage
2. CPU work: Execute on worker pool:
   - Load image from memory
   - Resize using high-quality filter (Lanczos3)
   - Encode to desired format
   - Return resized buffer
3. Async I/O: Store result to blob storage

This separates I/O (async) from CPU (worker thread), allowing:
- The async runtime to continue processing other tasks during image resizing
- Background tasks (Low priority) not to starve user-facing operations
- Efficient resource utilization

With HTTP Handlers

Handlers can offload CPU work using the same async/worker/async pattern:

Algorithm: HTTP Handler with Worker Pool

1. Async I/O: Load file from blob storage
2. CPU work (Priority::High): Compress data with zstd encoder:
   - Create encoder with compression level
   - Write file data to encoder
   - Finish and return compressed buffer
3. Return compressed data as HTTP response

User is waiting (blocking on HTTP request), so High priority ensures
quick response time.

Parallel Processing

Process multiple items in parallel:

Algorithm: Parallel Image Processing

1. For each image in images:
   - Submit job to worker pool with Priority::Medium
   - Collect resulting futures
2. Await all futures in parallel using try_join_all
3. Return all processed images

This allows multiple images to be processed concurrently across
available worker pool threads while the async runtime continues
handling other tasks.

Configuration

Sizing the Worker Pool

Determining thread counts:

Configuration Options:

Conservative (leave cores for async runtime):
  high = 1
  medium = max(num_cpus / 4, 1)
  low = max(num_cpus / 4, 1)
  → Reserve 50% of cores for async runtime

Aggressive (for CPU-heavy workloads):
  high = 2
  medium = max(num_cpus / 2, 2)
  low = max(num_cpus / 2, 1)
  → Use most cores for worker threads

Default (balanced, recommended):
  high = 1
  medium = 2
  low = 1
  → Suitable for mixed I/O and CPU workloads

Factors to consider:

  • CPU cores: More cores → more workers
  • Workload type: Heavy CPU → more workers
  • Memory: Each thread has stack (usually 2-8 MB)
  • Async runtime: Leave cores for Tokio

Environment Configuration

Configuration loading pattern:

high_workers = parse_env("WORKER_POOL_HIGH", default: 1)
medium_workers = parse_env("WORKER_POOL_MEDIUM", default: 2)
low_workers = parse_env("WORKER_POOL_LOW", default: 1)
worker_pool = WorkerPool::new(high_workers, medium_workers, low_workers)

Environment variables:

WORKER_POOL_HIGH=2    # 2 high-priority threads
WORKER_POOL_MEDIUM=4  # 4 medium-priority threads
WORKER_POOL_LOW=2     # 2 low-priority threads

Performance Characteristics

Throughput

High priority:

  • Theoretical max: ~100 jobs/second (assuming 10ms each)
  • Practical: 50-100 jobs/second
  • Bottleneck: Single dedicated thread

Medium priority:

  • Theoretical max: ~200 jobs/second (2 threads × 100 jobs)
  • Practical: 100-200 jobs/second
  • Bottleneck: Thread count × job duration

Low priority:

  • Throughput: Variable (depends on high/medium load)
  • Practical: 10-50 jobs/second when system busy

Latency

High priority:

  • Best case: <1ms (if queue empty)
  • Typical: 1-10ms
  • Worst case: <100ms (queued behind one job)

Medium priority:

  • Best case: <1ms (if queue empty)
  • Typical: 10-100ms
  • Worst case: Seconds (if many jobs queued)

Low priority:

  • Best case: <1ms (if all queues empty)
  • Typical: 100ms - 1s
  • Worst case: Minutes (if system busy)

Examples

Example 1: Image Processing

  • Load image from memory
  • Resize to 800×600 using Lanczos3 filter (high quality)
  • Encode to PNG format
  • Use Priority::Medium (user submitted but not actively waiting)

Example 2: Compression

  • Compress data buffer using zstd compression level 3
  • Use Priority::Low (non-urgent background task)
  • Returns compressed buffer

Example 3: Cryptographic Operations

  • Sign JWT token with ES384 algorithm
  • Use Priority::High (user waiting on login)
  • User is blocked waiting for response

Example 4: Batch Processing

  • Process 1000+ items in chunks of 100
  • Use Priority::Low (background task)
  • Ensures individual chunks don’t block higher priority work
  • Each chunk: iterate items and process

Example 5: Parallel Execution

  • Submit multiple image processing jobs to worker pool
  • Collect futures from all jobs
  • Await all futures in parallel
  • Use Priority::Medium for concurrent image resizing
  • All images process in parallel across available worker threads

Best Practices

1. Choose the Right Priority

  • User waiting → High: Direct user request, waiting for result
  • Background task → High: Wastes dedicated capacity, delays urgent work
  • Background → Low: Non-urgent work processed when queue is empty

2. Keep Work Units Small

  • Bad: One job processing 1,000,000 items (blocks that worker thread)
  • Good: Split into chunks of 1,000 items (allows interleaving with other jobs)
  • Benefit: Prevents starvation of higher priority work

3. Don’t Block on I/O in Worker

  • Bad: Blocking file I/O inside worker (blocks OS thread)
  • Good: Async I/O outside worker, pass result to worker
  • Pattern: async_io().await → worker.execute() → cpu_work

4. Handle Errors Appropriately

  • Wrap risky operations in match/try blocks
  • Log failures with context (which job, what error)
  • Match result and handle errors:
    • Transient errors: Retry
    • Permanent errors: Log and alert
    • Propagate critical errors up

5. Monitor Queue Depth

  • Periodically check queue depths for all priorities
  • Alert if High priority queue > 100 jobs (indicates bottleneck)
  • Use metrics to detect:
    • Capacity issues (need more workers)
    • Workload classification issues (wrong priorities)

Monitoring

Queue Metrics

WorkerPoolMetrics tracks:

  • high_queue_depth (AtomicUsize) - Current jobs in high priority queue
  • medium_queue_depth (AtomicUsize) - Current jobs in medium priority queue
  • low_queue_depth (AtomicUsize) - Current jobs in low priority queue
  • jobs_completed (AtomicU64) - Total successfully completed jobs
  • jobs_failed (AtomicU64) - Total failed jobs
  • total_execution_time (AtomicU64) - Cumulative job execution time in microseconds

Call metrics() to retrieve current state.

Logging

Enable debug logging:

RUST_LOG=cloudillo::worker=debug cargo run

Expected log output:

[DEBUG] Submitting job to high priority queue
[DEBUG] High priority worker executing job
[DEBUG] Job completed in 42ms
[WARN]  Medium priority queue depth: 150 jobs

Key log events:

  • Job submission to queue (which priority)
  • Worker thread starting job execution
  • Job completion with duration
  • Queue depth warnings when backed up

Troubleshooting

High Priority Queue Backed Up

Symptoms:

  • High priority jobs taking too long
  • User-facing operations slow

Causes:

  1. Too many high priority jobs being submitted
  2. Individual jobs taking >100ms (should be <100ms)
  3. Insufficient high priority workers

Solutions:

  1. Audit usage: Review code submitting High priority jobs, downgrade non-urgent work
  2. Increase workers: Set WORKER_POOL_HIGH=2
  3. Optimize duration: Profile and optimize slow operations

Worker Thread Panicked

Symptoms:

  • Worker thread count decreases
  • Jobs stop being processed

Causes:

  • Panic in job code (unwrap/expect on None/Err)
  • Out of memory condition
  • Stack overflow in recursive code

Solutions:

  1. Catch panics: Use std::panic::catch_unwind() to handle panics gracefully
  2. Panic hooks: Install panic hook to log worker panics and optionally respawn
  3. Increase stack: Use std::thread::Builder::new().stack_size(8 * 1024 * 1024)

Memory Issues

Symptoms:

  • OOM (Out of Memory) errors
  • Increasing memory usage over time

Causes:

  • Large job payloads being copied into closures
  • Memory leaks in job code (unreleased buffers)
  • Too many worker threads (each has 2-8 MB stack)

Solutions:

  1. Reduce worker count: Lower WORKER_POOL_MEDIUM and WORKER_POOL_LOW
  2. Stream instead of loading: Process data in chunks rather than loading entire file
  3. Profile memory: Use valgrind or heaptrack to identify leaks

Comparison: Worker Pool vs Async Runtime

Feature Worker Pool Async Runtime (Tokio)
Best for CPU-intensive work I/O-bound operations
Threading OS threads Green threads (tasks)
Blocking OK to block Must not block
Overhead Higher (context switch) Lower (cooperative)
Priorities 3 tiers No built-in priorities
Use case Image processing, crypto Network, file I/O

When to use Worker Pool:

  • ✅ CPU-heavy operations (>10ms of CPU time)
  • ✅ Blocking operations (unavoidable blocking)
  • ✅ Need priority control
  • ✅ Parallel CPU work

When to use Async Runtime:

  • ✅ I/O operations (network, disk)
  • ✅ Many concurrent operations
  • ✅ Quick operations (<10ms)
  • ✅ High concurrency needed (1000s of tasks)

See Also

  • Task Scheduler - Task scheduling system that uses worker pool
  • System Architecture - Overall system design
  • File Storage - Image processing with worker pool
  • [Actions](/architecture/actions-federation/actions - Cryptographic operations with worker pool