Worker Pool Architecture

Cloudillo’s Worker Pool provides a three-tier priority thread pool for executing CPU-intensive and blocking operations. This system complements the async runtime by handling work that shouldn’t block async tasks, ensuring responsive performance even under heavy computational load.

Architecture Overview

Three-Tier Priority System

┌───────────────────────────────────────────┐
│ High Priority Queue (1 dedicated thread)  │
│  - User-facing operations                 │
│  - Time-sensitive tasks                   │
│  - Cryptographic signing                  │
└───────────────────────────────────────────┘
                ↓
┌───────────────────────────────────────────┐
│ Medium Priority Queue (2 threads)         │
│  - Processes: High + Medium               │
│  - Image resizing for posts               │
│  - File compression                       │
│  - Normal background work                 │
└───────────────────────────────────────────┘
                ↓
┌───────────────────────────────────────────┐
│ Low Priority Queue (1 thread)             │
│  - Processes: High + Medium + Low         │
│  - Batch operations                       │
│  - Cleanup tasks                          │
│  - Non-urgent processing                  │
└───────────────────────────────────────────┘

Default Configuration:

  • High priority: 1 dedicated thread
  • Medium priority: 2 threads (also process High)
  • Low priority: 1 thread (also processes High + Medium)
  • Total threads: 4

Priority Cascading

Higher priority workers process lower priority work when idle:

Thread 1 (High):     High tasks only
Threads 2-3 (Med):   High → Medium (if no High)
Thread 4 (Low):      High → Medium → Low (if none above)

Benefits:

  • High-priority work always has dedicated capacity
  • Lower-priority work still gets done when system is idle
  • Automatic load balancing

Core Components

WorkerPool Structure

pub struct WorkerPool {
    high_tx: flume::Sender<Job>,
    medium_tx: flume::Sender<Job>,
    low_tx: flume::Sender<Job>,
    handles: Vec<JoinHandle<()>>,
}

type Job = Box<dyn FnOnce() -> ClResult<Box<dyn Any + Send>> + Send>;

Initialization

Algorithm: Initialize Worker Pool

1. Create three unbounded message channels (high, medium, low)
2. Create thread handles vector
3. For each high-priority worker:
   - Clone high-priority receiver
   - Spawn worker thread (processes high priority only)
   - Store thread handle
4. For each medium-priority worker:
   - Clone high and medium receivers
   - Spawn cascading worker (processes high first, then medium)
   - Store thread handle
5. For each low-priority worker:
   - Clone all three receivers
   - Spawn cascading worker (processes high → medium → low)
   - Store thread handle
6. Return WorkerPool struct with senders and handles

Worker Thread

Basic Worker Spawning:

Algorithm: Spawn Single-Priority Worker

1. Spawn OS thread with receiver
2. Loop:
   a. Receive job from channel (blocking)
   b. Execute job
   c. Log success or error
   d. If channel disconnected, exit thread

Cascading Worker Spawning:

Algorithm: Spawn Multi-Priority Cascading Worker

1. Spawn OS thread with multiple receivers (high → medium → low)
2. Continuous loop:
   a. For each receiver in priority order:
      - Try to receive job (non-blocking)
      - If job available: execute, log result, restart loop
      - If empty: try next priority
      - If disconnected: exit thread
   b. If all queues empty: sleep 10ms and retry

This ensures high priority work is processed first, then medium, then low.

API Usage

Execute Method

Algorithm: Execute Work on Worker Pool

Input: priority level, CPU-bound function F
Output: Result<T>

1. Create async oneshot channel (tx, rx)
2. Wrap function F:
   - Execute F on worker thread
   - Send result through tx
   - Return success boxed as Any
3. Send wrapped job to appropriate priority queue:
   - High priority → high_tx
   - Medium priority → medium_tx
   - Low priority → low_tx
4. Await result through rx (async, doesn't block executor)
5. Return result to caller

This pattern allows async code to offload CPU work without blocking the async runtime.

Priority Enum

pub enum Priority {
    High,    // User-facing, time-sensitive
    Medium,  // Normal operations
    Low,     // Background, non-urgent
}

Metrics

The WorkerPoolMetrics struct tracks: queue depths per priority, total jobs completed/failed, and cumulative execution time.


Priority Guidelines

Priority Use For Characteristics
High Crypto during login, profile picture processing, real-time compression User actively waiting, <100ms typical, <100 jobs/sec
Medium Image variants for posts, file compression, data transforms Background but needed soon, 100ms-10s, default choice
Low Batch processing, cleanup, pre-generation, optimization No user waiting, can be delayed indefinitely

Integration Patterns

With Task Scheduler

Tasks often use the worker pool for CPU-intensive work following this pattern:

Algorithm: Task with Worker Pool Integration

1. Async I/O: Load data from blob storage
2. CPU work: Execute on worker pool:
   - Load image from memory
   - Resize using high-quality filter (Lanczos3)
   - Encode to desired format
   - Return resized buffer
3. Async I/O: Store result to blob storage

This separates I/O (async) from CPU (worker thread), allowing:
- The async runtime to continue processing other tasks during image resizing
- Background tasks (Low priority) not to starve user-facing operations
- Efficient resource utilization

Configuration

Sizing the Worker Pool

Default thread counts (balanced, recommended):

high = 1
medium = 2
low = 1
→ Suitable for mixed I/O and CPU workloads

Factors to consider:

  • CPU cores: More cores → more workers
  • Workload type: Heavy CPU → more workers
  • Memory: Each thread has stack (usually 2-8 MB)
  • Async runtime: Leave cores for Tokio

Environment Configuration

Configuration loading pattern:

high_workers = parse_env("WORKER_POOL_HIGH", default: 1)
medium_workers = parse_env("WORKER_POOL_MEDIUM", default: 2)
low_workers = parse_env("WORKER_POOL_LOW", default: 1)
worker_pool = WorkerPool::new(high_workers, medium_workers, low_workers)

Environment variables:

WORKER_POOL_HIGH=2    # 2 high-priority threads
WORKER_POOL_MEDIUM=4  # 4 medium-priority threads
WORKER_POOL_LOW=2     # 2 low-priority threads

See Also