Task Scheduler System
Cloudillo’s Task Scheduler is a sophisticated persistent task execution system that enables reliable, async background processing with dependency management, automatic retries, and cron-style scheduling. This system is critical for federation, file processing, and any operations that need to survive server restarts.
Why Task-Based Processing?
Traditional async tasks (like tokio::spawn) have significant limitations for production systems:
Problems with Simple Async:
- ❌ Lost on restart: Server restarts lose all pending tasks
- ❌ No retry logic: Failures require manual handling
- ❌ No dependencies: Can’t wait for other tasks to complete
- ❌ No scheduling: Can’t run tasks at specific times
- ❌ No observability: Hard to track task progress/failures
Task Scheduler Solutions:
- ✅ Persistent: Tasks survive server restarts via MetaAdapter
- ✅ Automatic Retry: Exponential backoff with configurable limits
- ✅ Dependency Tracking: Tasks wait for dependencies (DAG)
- ✅ Cron Scheduling: Recurring tasks (daily, hourly, etc.)
- ✅ Observable: Track task status, retries, and failures
- ✅ Priority Queues: High/medium/low priority execution
Architecture Overview
Components
┌────────────────────────────────────────────┐
│ Task Scheduler │
│ ┌──────────────────────────────────────┐ │
│ │ Task Registry │ │
│ │ - TaskBuilder functions │ │
│ │ - Task type mapping │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Task Queue │ │
│ │ - Pending tasks │ │
│ │ - Running tasks │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Dependency Tracker │ │
│ │ - DAG (Directed Acyclic Graph) │ │
│ │ - Waiting tasks │ │
│ │ - Completion notifications │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Retry Manager │ │
│ │ - Exponential backoff │ │
│ │ - Retry counters │ │
│ │ - Backoff timers │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Cron Scheduler │ │
│ │ - Cron expressions │ │
│ │ - Next execution time │ │
│ │ - Recurring task tracking │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ MetaAdapter (Persistence) │
│ - Task metadata storage │
│ - Task status tracking │
│ - Dependency relationships │
└────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ Worker Pool (Execution) │
│ - High priority workers │
│ - Medium priority workers │
│ - Low priority workers │
└────────────────────────────────────────────┘Execution Flow
1. Task Created
↓
2. Store in MetaAdapter (persistence)
↓
3. Check Dependencies / scheduling
├─ Has dependencies? → Wait
└─ No dependencies? → Queue
↓
4. Add to Priority Queue
↓
5. Execute Task
├─ Success → Mark complete, notify dependents
├─ Failure (transient) → Schedule retry with backoff
└─ Failure (permanent) → Mark failed, notify dependents
↓
6. Trigger Dependent TasksTask Trait System
All tasks implement the Task<S> trait:
#[async_trait]
pub trait Task<S: Clone>: Send + Sync + Debug {
/// Task type identifier (e.g., "ActionCreator", "ImageResizer")
fn kind() -> &'static str where Self: Sized;
/// Build task from serialized context
fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
where Self: Sized;
/// Serialize task for persistence
fn serialize(&self) -> String;
/// Execute the task
async fn run(&self, state: &S) -> ClResult<()>;
/// Get task type of this instance
fn kind_of(&self) -> &'static str;
}Generic State Parameter
The S parameter is the application state (typically Arc<AppState>):
// Task receives app state for accessing adapters
async fn run(&self, state: &Arc<AppState>) -> ClResult<()> {
// Access adapters through state
state.meta_adapter.create_action(...).await?;
state.blob_adapter.create_blob(...).await?;
Ok(())
}Task Serialization
Tasks must serialize to strings for persistence:
MyTask.serialize():
return JSON.stringify(self)
MyTask.build(id, context):
task = JSON.parse(context)
return Arc(task)Key Features
1. Dependency Tracking
Tasks can depend on other tasks forming a Directed Acyclic Graph (DAG):
# Create file processing task
file_task_id = scheduler
.task(FileIdGeneratorTask(temp_path))
.schedule()
# Create image resizing task that depends on file task
resize_task_id = scheduler
.task(ImageResizerTask(file_id, "hd"))
.depend_on([file_task_id]) # Wait for file task
.schedule()
# Create action that depends on both
action_task_id = scheduler
.task(ActionCreatorTask(action_data))
.depend_on([file_task_id, resize_task_id]) # Wait for both
.schedule()Dependency Resolution:
FileIdGeneratorTask (no dependencies)
↓ completes
ImageResizerTask (depends on file task)
↓ completes
ActionCreatorTask (depends on both)
↓ executesCycle Detection: The scheduler detects and rejects circular dependencies:
# This would be rejected
task_a.depend_on([task_b])
task_b.depend_on([task_a]) # Error: cycle detected2. Exponential Backoff Retry
Tasks can automatically retry on failure with increasing delays:
retry_policy = RetryPolicy(
min_wait: 10s
max_wait: 3600s (1 hour)
max_attempts: 5
)
scheduler
.task(ActionDeliveryTask(action_token, recipient))
.with_retry(retry_policy)
.schedule()Retry Schedule:
Attempt 1: Immediate
↓ fails
Attempt 2: 10s later (min wait)
↓ fails
Attempt 3: 20s later (2x backoff)
↓ fails
Attempt 4: 40s later (2x backoff)
↓ fails
Attempt 5: 80s later (capped at max)
↓ fails
Attempt 6: Not attempted (max retries reached)
↓
Task marked as failedRetry Logic:
struct RetryPolicy {
min_wait_secs: u64 # Minimum wait between retries
max_wait_secs: u64 # Maximum wait between retries
max_attempts: u32 # Maximum retry attempts
}Wait calculation:
calculate_wait(attempt):
wait = min_wait_secs * 2^(attempt - 1)
wait = min(wait, max_wait_secs)
return wait3. Cron Scheduling
Tasks can run on a schedule using cron expressions:
# Every day at 2:30 AM
scheduler
.task(CleanupTask(temp_dir))
.daily_at(2, 30)
.schedule()
# Every hour at :00
scheduler
.task(BackupTask(db_path))
.hourly_at(0)
.schedule()
# Every Monday at 9:00 AM
scheduler
.task(WeeklyReportTask())
.weekly_on(Weekday::Monday, 9, 0)
.schedule()
# 1st of every month at midnight
scheduler
.task(MonthlyTask())
.monthly_on(1, 0, 0)
.schedule()Cron Expression Format:
minute hour day month weekday
* * * * * (every minute)
0 * * * * (every hour at :00)
0 0 * * * (every day at midnight)
0 0 * * 1 (every Monday at midnight)
0 0 1 * * (1st of month at midnight)Fields:
- minute: 0-59
- hour: 0-23
- day: 1-31
- month: 1-12
- weekday: 0-6 (0 = Sunday)
4. Persistence
Tasks are persisted to MetaAdapter and survive server restarts:
start_scheduler(app):
scheduler = Scheduler(app)
# Load unfinished tasks from database
pending_tasks = app.meta_adapter.list_pending_tasks()
for task_meta in pending_tasks:
# Rebuild task from serialized context
task = TaskRegistry.build(
task_meta.kind,
task_meta.id,
task_meta.context
)
# Re-queue task
scheduler.enqueue(task_meta.id, task)
# Start processing
scheduler.start()Task States:
TaskStatus:
Pending → Created, not yet run
Completed → Successfully finished
Failed → Permanently failedBuilt-in Task Types
ActionCreatorTask
Creates and signs action tokens for federation.
Purpose: Generate cryptographically signed action tokens
Usage:
task = ActionCreatorTask {
tn_id: alice_tn_id
action_type: "POST"
content: post_data
attachments: ["f1~abc123"]
audience: null
}
task_id = scheduler
.task(task)
.depend_on([file_task_id]) # Wait for file upload
.schedule()What it does:
- Loads private signing key from AuthAdapter
- Builds JWT claims (iss, iat, t, c, p, a, etc.)
- Signs JWT with ES384 (P384 key)
- Computes action ID (SHA256 hash of token)
- Stores action in MetaAdapter
- Returns action ID
Location: server/src/action/action.rs
ActionVerifierTask
Validates incoming federated action tokens.
Purpose: Verify action tokens from remote instances
Usage:
# Triggered when receiving action at /api/inbox
task = ActionVerifierTask {
token: incoming_token
}
scheduler
.task(task)
.schedule()What it does:
- Decodes JWT without verification
- Fetches issuer’s public key from their
/api/me/keys - Verifies ES384 signature
- Checks expiration (if present)
- Validates permissions (following/connected status)
- Downloads missing file attachments
- Stores verified action in MetaAdapter
Location: server/src/action/action.rs
ActionDeliveryTask
Delivers action tokens to remote instances with retry.
Purpose: Federate actions to other Cloudillo instances
Usage:
task = ActionDeliveryTask {
action_token: signed_token
recipient: "bob.example.com"
}
retry_policy = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)
scheduler
.task(task)
.with_retry(retry_policy)
.schedule()What it does:
- POSTs action token to
https://cl-o.{recipient}/api/inbox - On success: Mark as delivered
- On temporary failure: Schedule retry with exponential backoff
- On permanent failure: Log and mark as failed
Retry behavior:
- Network errors → Retry
- 5xx errors → Retry
- 4xx errors → Don’t retry (permanent failure)
- Timeout → Retry
Location: server/src/action/action.rs
FileIdGeneratorTask
Generates content-addressed file IDs using SHA256.
Purpose: Create immutable file IDs for blob storage
Usage:
task = FileIdGeneratorTask {
tn_id: alice_tn_id
temp_file_path: "/tmp/upload-xyz"
original_filename: "photo.jpg"
}
task_id = scheduler
.task(task)
.schedule()What it does:
- Opens file from temp path
- Computes SHA256 hash (streaming for large files)
- Encodes hash as base64url
- Formats as
f1~{hash} - Moves file to permanent BlobAdapter storage
- Stores file metadata in MetaAdapter
Location: server/src/file/file.rs
ImageResizerTask
Generates image variants (thumbnails, different sizes).
Purpose: Create optimized image variants for different use cases
Usage:
task = ImageResizerTask {
tn_id: alice_tn_id
source_file_id: "f1~abc123"
variant: "hd" # tn, sd, md, hd, xd
target_width: 1920
target_height: 1080
format: "avif"
quality: 85
}
scheduler
.task(task)
.depend_on([file_id_task]) # Wait for original upload
.priority(Priority::Low) # CPU-intensive, low priority
.schedule()What it does:
- Loads source image from BlobAdapter
- Resizes with Lanczos3 filter (high quality)
- Encodes to target format (AVIF/WebP/JPEG)
- Uses worker pool for CPU-intensive work
- Computes variant ID (SHA256)
- Stores variant in BlobAdapter
- Updates file metadata with variant info
Location: server/src/file/image.rs
Builder Pattern API
The scheduler uses a fluent builder API for task configuration:
Basic Usage
scheduler
.task(MyTask(data))
.schedule()With Key (Idempotency)
scheduler
.task(MyTask(data))
.key("unique-key") # For task identification
.schedule()If a task with the same key already exists, scheduling is skipped.
With Dependencies
scheduler
.task(MyTask(data))
.depend_on([task_id_1, task_id_2])
.schedule()With Retry Policy
retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)
scheduler
.task(MyTask(data))
.with_retry(retry)
.schedule()With Cron Schedule
# Daily at 2:30 AM
scheduler.task(MyTask(data)).daily_at(2, 30).schedule()
# Hourly at :00
scheduler.task(MyTask(data)).hourly_at(0).schedule()
# Weekly on Monday at 9:00 AM
scheduler.task(MyTask(data)).weekly_on(Weekday::Monday, 9, 0).schedule()
# Monthly on 1st at midnight
scheduler.task(MyTask(data)).monthly_on(1, 0, 0).schedule()
# Custom cron expression
scheduler.task(MyTask(data)).cron("0 0 * * 1").schedule() # Monday midnightCombining Multiple Options
retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)
scheduler
.task(ActionDeliveryTask(token, recipient))
.key(format("deliver-{}-{}", action_id, recipient))
.depend_on([action_creator_task])
.with_retry(retry)
.priority(Priority::Medium)
.schedule()Creating Custom Task Types
Step 1: Define Task Struct
struct EmailTask {
to: String
subject: String
body: String
}
impl EmailTask {
new(to, subject, body):
return EmailTask { to, subject, body }
}Step 2: Implement Task Trait
impl Task<App> for EmailTask:
kind() → "EmailTask"
kind_of() → "EmailTask"
serialize() → JSON.stringify(self)
build(id, context) → JSON.parse(context)
run(state):
client = new HttpClient()
client.post("https://api.emailservice.com/send")
.json({"to": self.to, "subject": self.subject, "body": self.body})
.send()
log("Email sent to {}", self.to)Step 3: Register Task Type
# In scheduler initialization
scheduler.register(EmailTask, "EmailTask", EmailTask.build)Step 4: Use the Task
task = EmailTask(
to: "user@example.com"
subject: "Welcome!"
body: "Welcome to Cloudillo!"
)
retry = RetryPolicy(min: 30s, max: 3600s, max_attempts: 3)
scheduler
.task(task)
.with_retry(retry)
.schedule()Integration with Worker Pool
CPU-intensive tasks should use the worker pool:
impl Task<App> for ImageResizerTask:
run(state):
# Load image (async I/O)
image_data = state.blob_adapter.read_blob_buf(
self.tn_id, self.source_file_id
)
# Resize image (CPU-intensive - use worker pool)
resized = state.worker.run(closure:
img = image.load_from_memory(image_data)
resized = img.resize(
self.target_width,
self.target_height,
FilterType::Lanczos3
)
# Encode to format
buffer = resized.encode_to(self.format)
return buffer
)
# Store variant (async I/O)
variant_id = compute_file_id(resized)
state.blob_adapter.create_blob_buf(
self.tn_id, variant_id, resized
)When to use worker pool:
- Image/video processing
- Compression/decompression
- Cryptographic operations
- Complex computations
- Any blocking I/O
When NOT to use worker pool:
- Simple async I/O (database, network)
- Quick operations (<1ms)
Examples
Example 1: Simple Task
# Create a simple cleanup task
task = TempFileCleanupTask {
directory: "/tmp/uploads"
older_than_hours: 24
}
scheduler.task(task).schedule()Example 2: Task with Dependencies
# Upload file
file_task = FileIdGeneratorTask(temp_path)
file_task_id = scheduler.task(file_task).schedule()
# Generate thumbnails (depends on file upload)
thumb_task = ImageResizerTask(file_id, "tn")
scheduler
.task(thumb_task)
.depend_on([file_task_id])
.schedule()Example 3: Task with Retry
# Federation delivery with retry
task = ActionDeliveryTask {
action_token: signed_token
recipient: "remote.example.com"
}
retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)
scheduler
.task(task)
.with_retry(retry)
.schedule()Example 4: Scheduled Task
# Daily backup at 2:00 AM
task = BackupTask {
source_dir: "/data"
dest_dir: "/backups"
}
scheduler.task(task).daily_at(2, 0).schedule()Example 5: Complex Workflow
# 1. Upload file
upload_task = FileIdGeneratorTask(temp_path)
upload_id = scheduler.task(upload_task).schedule()
# 2. Generate variants (parallel, all depend on upload)
variant_ids = []
for variant in ["tn", "sd", "md", "hd"]:
task = ImageResizerTask(file_id, variant)
id = scheduler
.task(task)
.depend_on([upload_id])
.priority(Priority::Low)
.schedule()
variant_ids.push(id)
# 3. Create action (depends on all variants)
action_task = ActionCreatorTask(post_data)
scheduler
.task(action_task)
.depend_on(variant_ids)
.schedule()Monitoring & Debugging
Task Status Queries
status = scheduler.get_task_status(task_id)
switch status:
case Pending: print("Task waiting to run")
case Running: print("Task currently executing")
case WaitingDeps: print("Task waiting for dependencies")
case Retrying: print("Task failed, will retry")
case Completed: print("Task finished successfully")
case Failed: print("Task permanently failed")List Tasks
# List all pending tasks
pending = scheduler.list_tasks(TaskStatus::Pending)
for task in pending:
print("{}: {} ({})", task.id, task.kind, task.created_at)Task Logs
# Enable task logging
RUST_LOG=cloudillo::scheduler=debug cargo run
# Logs show:
# [DEBUG] Scheduling task TaskId(123) of type ActionCreatorTask
# [DEBUG] Task TaskId(123) waiting for dependencies: [TaskId(122)]
# [INFO] Task TaskId(122) completed successfully
# [DEBUG] Task TaskId(123) dependencies satisfied, queueing
# [INFO] Executing task TaskId(123) (ActionCreatorTask)
# [INFO] Task TaskId(123) completed in 42msError Handling
Transient vs Permanent Failures
Tasks should distinguish between transient (retry) and permanent (fail) errors:
run(state):
result = deliver_action(self.recipient)
if result == Ok:
return Ok
# Transient errors - will retry
if is_network_error(result):
return error (will retry)
if is_timeout(result):
return error (will retry)
if is_5xx_error(result):
return error (will retry)
# Permanent errors - won't retry
if is_4xx_error(result):
log_warn("Permanent delivery failure: {}", result)
return PermanentFailure(result)
return errorHandling Dependencies Failure
When a dependency fails, dependent tasks are notified:
# Task A fails
# Tasks B, C, D depend on A
# B can handle failure:
run(state):
dep_error = check_dep_failures()
if dep_error exists:
# Use fallback
return run_with_fallback(state)
return run_normal(state)
# Or task can fail immediately when dependency failsPerformance Considerations
Task Granularity
Too Fine-Grained (Bad):
# Creating 1000 tiny tasks
for i in 0..1000:
scheduler.task(ProcessItemTask(item_id: i)).schedule()
# Overhead: 1000 DB writes, 1000 queue operationsAppropriate Granularity (Good):
# One task processes batch
scheduler.task(ProcessBatchTask(item_ids: 0..1000)).schedule()
# Overhead: 1 DB write, 1 queue operationDependency Depth
Avoid deep dependency chains:
// Bad: Deep chain (slow to start)
A → B → C → D → E → F
// Good: Wide graph (parallel execution)
┌─ B
A ──┼─ C
├─ D
└─ E → FPriority Usage
Use priorities appropriately:
- High: User-facing operations (<100/second)
- Medium: Normal operations (default)
- Low: Background tasks, cleanups
Too many high-priority tasks defeat the purpose.
Troubleshooting
Task Stuck in “Pending”
Possible causes:
-
Waiting for dependencies
deps = scheduler.get_task_dependencies(task_id) for dep in deps: print("Waiting for: {} ({})", dep.id, dep.status) -
Circular dependency (should be caught, but check)
scheduler.check_for_cycles(task_id) -
Scheduler not running
# Check if scheduler started scheduler.is_running()
Task Keeps Retrying
Causes:
- Transient error not resolving
- Wrong error classification (should be permanent)
- Retry policy too aggressive
Solutions:
# 1. Check retry count
retry_count = scheduler.get_retry_count(task_id)
# 2. Check last error
last_error = scheduler.get_last_error(task_id)
# 3. Cancel task if stuck
scheduler.cancel_task(task_id)
# 4. Adjust retry policy
new_retry = RetryPolicy(min: 60s, max: 3600s, max_attempts: 3)High Memory Usage
Causes:
- Too many tasks in memory
- Large task payloads
- Not cleaning completed tasks
Solutions:
# 1. Limit concurrent tasks
MAX_CONCURRENT_TASKS = 100
# 2. Clean old completed tasks
scheduler.cleanup_completed_tasks(older_than_hours: 24)
# 3. Use pagination for large data
# Instead of embedding 10MB in task:
ProcessFileTask:
file_id: String # Reference, not dataBest Practices
1. Make Tasks Idempotent
Tasks may run multiple times (retries, restarts):
# Bad: Not idempotent
run(state):
state.counter.increment() # Runs twice = double increment
# Good: Idempotent
run(state):
state.counter.set(42) # Runs twice = same result
# Good: Check before acting
run(state):
if NOT state.already_processed(self.id):
state.counter.increment()
state.mark_processed(self.id)2. Use Task Keys for Deduplication
# Prevent duplicate tasks
key = format("deliver-{}-{}", action_id, recipient)
scheduler
.task(ActionDeliveryTask(token, recipient))
.key(key) # Won't schedule if already exists
.schedule()3. Keep Task Payloads Small
# Bad: Large payload stored in DB
ProcessDataTask:
data: Vec<u8> # 10 MB!
# Good: Reference to data
ProcessDataTask:
file_id: String # Load from blob when needed
run(state):
data = state.blob_adapter.read_blob(self.file_id)
# Process data...4. Clean Up Temporary Data
run(state):
# Create temp file
temp_path = create_temp_file()
# Use temp file
result = process_file(temp_path)
# Always clean up (even on error)
remove_file(temp_path) # Safe even if fails
return result5. Log Progress for Long Tasks
run(state):
items = load_items()
total = items.length
for (i, item) in enumerate(items):
process_item(item)
if i % 100 == 0:
log("Progress: {}/{} ({}%)", i, total, i * 100 / total)See Also
- Worker Pool - CPU-intensive task execution
- System Architecture - Overall system design
- [Actions](/architecture/actions-federation/actions - Action token creation/verification tasks
- File Storage - File processing tasks