Worker Pool Architecture
Cloudillo’s Worker Pool provides a three-tier priority thread pool for executing CPU-intensive and blocking operations. This system complements the async runtime by handling work that shouldn’t block async tasks, ensuring responsive performance even under heavy computational load.
Why a Separate Worker Pool?
The async runtime (Tokio) is optimized for I/O-bound tasks, not CPU-bound work:
Problems with CPU Work on Async Runtime:
- ❌ Blocks other tasks: Heavy CPU work prevents other async tasks from making progress
- ❌ Thread starvation: Can exhaust async thread pool
- ❌ Poor latency: User-facing requests wait for CPU work to complete
- ❌ No prioritization: Can’t prioritize urgent CPU work over background processing
Worker Pool Solutions:
- ✅ Dedicated threads: Separate from async runtime
- ✅ Priority-based: Three tiers (High/Medium/Low) for different urgency levels
- ✅ Work stealing: Efficient load balancing across workers
- ✅ Future-based API: Async-friendly interface
- ✅ Backpressure: Prevents overwhelming the system
Architecture Overview
Three-Tier Priority System
┌───────────────────────────────────────────┐
│ High Priority Queue (1 dedicated thread) │
│ - User-facing operations │
│ - Time-sensitive tasks │
│ - Cryptographic signing │
└───────────────────────────────────────────┘
↓
┌───────────────────────────────────────────┐
│ Medium Priority Queue (2 threads) │
│ - Processes: High + Medium │
│ - Image resizing for posts │
│ - File compression │
│ - Normal background work │
└───────────────────────────────────────────┘
↓
┌───────────────────────────────────────────┐
│ Low Priority Queue (1 thread) │
│ - Processes: High + Medium + Low │
│ - Batch operations │
│ - Cleanup tasks │
│ - Non-urgent processing │
└───────────────────────────────────────────┘Default Configuration:
- High priority: 1 dedicated thread
- Medium priority: 2 threads (also process High)
- Low priority: 1 thread (also processes High + Medium)
- Total threads: 4
Priority Cascading
Higher priority workers process lower priority work when idle:
Thread 1 (High): High tasks only
Threads 2-3 (Med): High → Medium (if no High)
Thread 4 (Low): High → Medium → Low (if none above)Benefits:
- High-priority work always has dedicated capacity
- Lower-priority work still gets done when system is idle
- Automatic load balancing
Core Components
WorkerPool Structure
pub struct WorkerPool {
high_tx: flume::Sender<Job>,
medium_tx: flume::Sender<Job>,
low_tx: flume::Sender<Job>,
handles: Vec<JoinHandle<()>>,
}
type Job = Box<dyn FnOnce() -> ClResult<Box<dyn Any + Send>> + Send>;Initialization
Algorithm: Initialize Worker Pool
1. Create three unbounded message channels (high, medium, low)
2. Create thread handles vector
3. For each high-priority worker:
- Clone high-priority receiver
- Spawn worker thread (processes high priority only)
- Store thread handle
4. For each medium-priority worker:
- Clone high and medium receivers
- Spawn cascading worker (processes high first, then medium)
- Store thread handle
5. For each low-priority worker:
- Clone all three receivers
- Spawn cascading worker (processes high → medium → low)
- Store thread handle
6. Return WorkerPool struct with senders and handlesWorker Thread
Basic Worker Spawning:
Algorithm: Spawn Single-Priority Worker
1. Spawn OS thread with receiver
2. Loop:
a. Receive job from channel (blocking)
b. Execute job
c. Log success or error
d. If channel disconnected, exit threadCascading Worker Spawning:
Algorithm: Spawn Multi-Priority Cascading Worker
1. Spawn OS thread with multiple receivers (high → medium → low)
2. Continuous loop:
a. For each receiver in priority order:
- Try to receive job (non-blocking)
- If job available: execute, log result, restart loop
- If empty: try next priority
- If disconnected: exit thread
b. If all queues empty: sleep 10ms and retry
This ensures high priority work is processed first, then medium, then low.API Usage
Execute Method
Algorithm: Execute Work on Worker Pool
Input: priority level, CPU-bound function F
Output: Result<T>
1. Create async oneshot channel (tx, rx)
2. Wrap function F:
- Execute F on worker thread
- Send result through tx
- Return success boxed as Any
3. Send wrapped job to appropriate priority queue:
- High priority → high_tx
- Medium priority → medium_tx
- Low priority → low_tx
4. Await result through rx (async, doesn't block executor)
5. Return result to caller
This pattern allows async code to offload CPU work without blocking the async runtime.Priority Enum
pub enum Priority {
High, // User-facing, time-sensitive
Medium, // Normal operations
Low, // Background, non-urgent
}When to Use Each Priority
High Priority (Dedicated Resources)
Use for:
- ✅ Cryptographic operations during login
- ✅ Image processing for profile picture uploads (user waiting)
- ✅ Real-time compression/decompression
- ✅ Time-sensitive computations
Characteristics:
- User is actively waiting
- Quick feedback required (<1 second)
- Affects UX directly
- Should be fast operations (<100ms typical)
Example Pattern:
// User uploading profile picture (waiting)
compressed = await worker.execute(Priority::High, || {
compress_image(image_data, quality)
})Guidelines:
- Target volume: <100 jobs/second
- Typical duration: <100ms
- When not to use: Background processing, batch operations
Medium Priority (Default)
Use for:
- ✅ Image processing for posts (async to user)
- ✅ File compression for uploads
- ✅ Data transformations
- ✅ Most CPU-intensive work
Characteristics:
- User submitted request but isn’t waiting
- Processing happens in background
- Results needed “soon” (seconds to minutes)
- Default choice for most CPU work
Example Pattern:
// Generating image variants for a post
thumbnail = await worker.execute(Priority::Medium, || {
resize_image(image_data, 200, 200)
})Guidelines:
- Target volume: <1000 jobs/second
- Typical duration: 100ms - 10 seconds
- When not to use: User actively waiting, or truly non-urgent
Low Priority (Background)
Use for:
- ✅ Batch processing
- ✅ Cleanup operations
- ✅ Pre-generation of assets
- ✅ Non-urgent optimization tasks
Characteristics:
- No user waiting
- Can take as long as needed
- Won’t impact user-facing operations
- Can be delayed indefinitely
Example Pattern:
// Batch thumbnail generation
for image in large_image_set:
await worker.execute(Priority::Low, || {
generate_all_variants(image)
})Guidelines:
- Target volume: Variable (throttle if needed)
- Typical duration: Seconds to minutes
- When not to use: Anything time-sensitive
Integration Patterns
With Task Scheduler
Tasks often use the worker pool for CPU-intensive work following this pattern:
Algorithm: Task with Worker Pool Integration
1. Async I/O: Load data from blob storage
2. CPU work: Execute on worker pool:
- Load image from memory
- Resize using high-quality filter (Lanczos3)
- Encode to desired format
- Return resized buffer
3. Async I/O: Store result to blob storage
This separates I/O (async) from CPU (worker thread), allowing:
- The async runtime to continue processing other tasks during image resizing
- Background tasks (Low priority) not to starve user-facing operations
- Efficient resource utilizationWith HTTP Handlers
Handlers can offload CPU work using the same async/worker/async pattern:
Algorithm: HTTP Handler with Worker Pool
1. Async I/O: Load file from blob storage
2. CPU work (Priority::High): Compress data with zstd encoder:
- Create encoder with compression level
- Write file data to encoder
- Finish and return compressed buffer
3. Return compressed data as HTTP response
User is waiting (blocking on HTTP request), so High priority ensures
quick response time.Parallel Processing
Process multiple items in parallel:
Algorithm: Parallel Image Processing
1. For each image in images:
- Submit job to worker pool with Priority::Medium
- Collect resulting futures
2. Await all futures in parallel using try_join_all
3. Return all processed images
This allows multiple images to be processed concurrently across
available worker pool threads while the async runtime continues
handling other tasks.Configuration
Sizing the Worker Pool
Determining thread counts:
Configuration Options:
Conservative (leave cores for async runtime):
high = 1
medium = max(num_cpus / 4, 1)
low = max(num_cpus / 4, 1)
→ Reserve 50% of cores for async runtime
Aggressive (for CPU-heavy workloads):
high = 2
medium = max(num_cpus / 2, 2)
low = max(num_cpus / 2, 1)
→ Use most cores for worker threads
Default (balanced, recommended):
high = 1
medium = 2
low = 1
→ Suitable for mixed I/O and CPU workloadsFactors to consider:
- CPU cores: More cores → more workers
- Workload type: Heavy CPU → more workers
- Memory: Each thread has stack (usually 2-8 MB)
- Async runtime: Leave cores for Tokio
Environment Configuration
Configuration loading pattern:
high_workers = parse_env("WORKER_POOL_HIGH", default: 1)
medium_workers = parse_env("WORKER_POOL_MEDIUM", default: 2)
low_workers = parse_env("WORKER_POOL_LOW", default: 1)
worker_pool = WorkerPool::new(high_workers, medium_workers, low_workers)Environment variables:
WORKER_POOL_HIGH=2 # 2 high-priority threads
WORKER_POOL_MEDIUM=4 # 4 medium-priority threads
WORKER_POOL_LOW=2 # 2 low-priority threadsPerformance Characteristics
Throughput
High priority:
- Theoretical max: ~100 jobs/second (assuming 10ms each)
- Practical: 50-100 jobs/second
- Bottleneck: Single dedicated thread
Medium priority:
- Theoretical max: ~200 jobs/second (2 threads × 100 jobs)
- Practical: 100-200 jobs/second
- Bottleneck: Thread count × job duration
Low priority:
- Throughput: Variable (depends on high/medium load)
- Practical: 10-50 jobs/second when system busy
Latency
High priority:
- Best case: <1ms (if queue empty)
- Typical: 1-10ms
- Worst case: <100ms (queued behind one job)
Medium priority:
- Best case: <1ms (if queue empty)
- Typical: 10-100ms
- Worst case: Seconds (if many jobs queued)
Low priority:
- Best case: <1ms (if all queues empty)
- Typical: 100ms - 1s
- Worst case: Minutes (if system busy)
Examples
Example 1: Image Processing
- Load image from memory
- Resize to 800×600 using Lanczos3 filter (high quality)
- Encode to PNG format
- Use Priority::Medium (user submitted but not actively waiting)
Example 2: Compression
- Compress data buffer using zstd compression level 3
- Use Priority::Low (non-urgent background task)
- Returns compressed buffer
Example 3: Cryptographic Operations
- Sign JWT token with ES384 algorithm
- Use Priority::High (user waiting on login)
- User is blocked waiting for response
Example 4: Batch Processing
- Process 1000+ items in chunks of 100
- Use Priority::Low (background task)
- Ensures individual chunks don’t block higher priority work
- Each chunk: iterate items and process
Example 5: Parallel Execution
- Submit multiple image processing jobs to worker pool
- Collect futures from all jobs
- Await all futures in parallel
- Use Priority::Medium for concurrent image resizing
- All images process in parallel across available worker threads
Best Practices
1. Choose the Right Priority
- ✅ User waiting → High: Direct user request, waiting for result
- ❌ Background task → High: Wastes dedicated capacity, delays urgent work
- ✅ Background → Low: Non-urgent work processed when queue is empty
2. Keep Work Units Small
- ❌ Bad: One job processing 1,000,000 items (blocks that worker thread)
- ✅ Good: Split into chunks of 1,000 items (allows interleaving with other jobs)
- Benefit: Prevents starvation of higher priority work
3. Don’t Block on I/O in Worker
- ❌ Bad: Blocking file I/O inside worker (blocks OS thread)
- ✅ Good: Async I/O outside worker, pass result to worker
- Pattern:
async_io().await → worker.execute() → cpu_work
4. Handle Errors Appropriately
- Wrap risky operations in match/try blocks
- Log failures with context (which job, what error)
- Match result and handle errors:
- Transient errors: Retry
- Permanent errors: Log and alert
- Propagate critical errors up
5. Monitor Queue Depth
- Periodically check queue depths for all priorities
- Alert if High priority queue > 100 jobs (indicates bottleneck)
- Use metrics to detect:
- Capacity issues (need more workers)
- Workload classification issues (wrong priorities)
Monitoring
Queue Metrics
WorkerPoolMetrics tracks:
high_queue_depth(AtomicUsize) - Current jobs in high priority queuemedium_queue_depth(AtomicUsize) - Current jobs in medium priority queuelow_queue_depth(AtomicUsize) - Current jobs in low priority queuejobs_completed(AtomicU64) - Total successfully completed jobsjobs_failed(AtomicU64) - Total failed jobstotal_execution_time(AtomicU64) - Cumulative job execution time in microseconds
Call metrics() to retrieve current state.
Logging
Enable debug logging:
RUST_LOG=cloudillo::worker=debug cargo runExpected log output:
[DEBUG] Submitting job to high priority queue
[DEBUG] High priority worker executing job
[DEBUG] Job completed in 42ms
[WARN] Medium priority queue depth: 150 jobsKey log events:
- Job submission to queue (which priority)
- Worker thread starting job execution
- Job completion with duration
- Queue depth warnings when backed up
Troubleshooting
High Priority Queue Backed Up
Symptoms:
- High priority jobs taking too long
- User-facing operations slow
Causes:
- Too many high priority jobs being submitted
- Individual jobs taking >100ms (should be <100ms)
- Insufficient high priority workers
Solutions:
- Audit usage: Review code submitting High priority jobs, downgrade non-urgent work
- Increase workers: Set
WORKER_POOL_HIGH=2 - Optimize duration: Profile and optimize slow operations
Worker Thread Panicked
Symptoms:
- Worker thread count decreases
- Jobs stop being processed
Causes:
- Panic in job code (unwrap/expect on None/Err)
- Out of memory condition
- Stack overflow in recursive code
Solutions:
- Catch panics: Use
std::panic::catch_unwind()to handle panics gracefully - Panic hooks: Install panic hook to log worker panics and optionally respawn
- Increase stack: Use
std::thread::Builder::new().stack_size(8 * 1024 * 1024)
Memory Issues
Symptoms:
- OOM (Out of Memory) errors
- Increasing memory usage over time
Causes:
- Large job payloads being copied into closures
- Memory leaks in job code (unreleased buffers)
- Too many worker threads (each has 2-8 MB stack)
Solutions:
- Reduce worker count: Lower
WORKER_POOL_MEDIUMandWORKER_POOL_LOW - Stream instead of loading: Process data in chunks rather than loading entire file
- Profile memory: Use valgrind or heaptrack to identify leaks
Comparison: Worker Pool vs Async Runtime
| Feature | Worker Pool | Async Runtime (Tokio) |
|---|---|---|
| Best for | CPU-intensive work | I/O-bound operations |
| Threading | OS threads | Green threads (tasks) |
| Blocking | OK to block | Must not block |
| Overhead | Higher (context switch) | Lower (cooperative) |
| Priorities | 3 tiers | No built-in priorities |
| Use case | Image processing, crypto | Network, file I/O |
When to use Worker Pool:
- ✅ CPU-heavy operations (>10ms of CPU time)
- ✅ Blocking operations (unavoidable blocking)
- ✅ Need priority control
- ✅ Parallel CPU work
When to use Async Runtime:
- ✅ I/O operations (network, disk)
- ✅ Many concurrent operations
- ✅ Quick operations (<10ms)
- ✅ High concurrency needed (1000s of tasks)
See Also
- Task Scheduler - Task scheduling system that uses worker pool
- System Architecture - Overall system design
- File Storage - Image processing with worker pool
- [Actions](/architecture/actions-federation/actions - Cryptographic operations with worker pool