Task Scheduler System
Cloudillo’s Task Scheduler is a sophisticated persistent task execution system that enables reliable, async background processing with dependency management, automatic retries, and cron-style scheduling. This system is critical for federation, file processing, and any operations that need to survive server restarts.
Why Task-Based Processing?
Simple async handlers cannot provide the guarantees needed for federation and file processing:
Problems with simple async:
- ✗ Lost on server restart (no persistence)
- ✗ No dependency ordering (file must finish before action)
- ✗ No automatic retry on transient failures
- ✗ No progress tracking or observability
- ✗ No priority management for resource allocation
Task scheduler solutions:
- ✅ Tasks survive server restarts (persisted via MetaAdapter)
- ✅ Dependency resolution (DAG-based ordering)
- ✅ Automatic retry with exponential backoff
- ✅ Task lifecycle tracking (pending → running → complete/failed)
- ✅ Priority-based execution via worker pool
Architecture Overview
Components
┌────────────────────────────────────────────┐
│ Task Scheduler │
│ ┌──────────────────────────────────────┐ │
│ │ Task Registry │ │
│ │ - TaskBuilder functions │ │
│ │ - Task type mapping │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Task Queue │ │
│ │ - Pending tasks │ │
│ │ - Running tasks │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Dependency Tracker │ │
│ │ - DAG (Directed Acyclic Graph) │ │
│ │ - Waiting tasks │ │
│ │ - Completion notifications │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Retry Manager │ │
│ │ - Exponential backoff │ │
│ │ - Retry counters │ │
│ │ - Backoff timers │ │
│ └──────────────────────────────────────┘ │
│ ┌──────────────────────────────────────┐ │
│ │ Cron Scheduler │ │
│ │ - Cron expressions │ │
│ │ - Next execution time │ │
│ │ - Recurring task tracking │ │
│ └──────────────────────────────────────┘ │
└────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ MetaAdapter (Persistence) │
│ - Task metadata storage │
│ - Task status tracking │
│ - Dependency relationships │
└────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ Worker Pool (Execution) │
│ - High priority workers │
│ - Medium priority workers │
│ - Low priority workers │
└────────────────────────────────────────────┘Task Trait System
All tasks implement the Task<S> trait:
#[async_trait]
pub trait Task<S: Clone>: Send + Sync + Debug {
/// Task type identifier (e.g., "ActionCreator", "ImageResizer")
fn kind() -> &'static str where Self: Sized;
/// Build task from serialized context
fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
where Self: Sized;
/// Serialize task for persistence
fn serialize(&self) -> String;
/// Execute the task
async fn run(&self, state: &S) -> ClResult<()>;
/// Get task type of this instance
fn kind_of(&self) -> &'static str;
}Generic State Parameter
The S parameter is the application state (typically Arc<AppState>):
// Task receives app state for accessing adapters
async fn run(&self, state: &Arc<AppState>) -> ClResult<()> {
// Access adapters through state
state.meta_adapter.create_action(...).await?;
state.blob_adapter.create_blob(...).await?;
Ok(())
}Task Serialization
Tasks must serialize to strings for persistence:
MyTask.serialize():
return JSON.stringify(self)
MyTask.build(id, context):
task = JSON.parse(context)
return Arc(task)Key Features
1. Dependency Tracking
Tasks can depend on other tasks forming a Directed Acyclic Graph (DAG):
# Create file processing task
file_task_id = scheduler
.task(FileIdGeneratorTask(temp_path))
.schedule()
# Create image resizing task that depends on file task
resize_task_id = scheduler
.task(ImageResizerTask(file_id, "hd"))
.depend_on([file_task_id]) # Wait for file task
.schedule()
# Create action that depends on both
action_task_id = scheduler
.task(ActionCreatorTask(action_data))
.depend_on([file_task_id, resize_task_id]) # Wait for both
.schedule()Dependency Resolution:
FileIdGeneratorTask (no dependencies)
↓ completes
ImageResizerTask (depends on file task)
↓ completes
ActionCreatorTask (depends on both)
↓ executes2. Exponential Backoff Retry
Tasks can automatically retry on failure with increasing delays:
retry_policy = RetryPolicy(
min_wait: 10s
max_wait: 3600s (1 hour)
max_attempts: 5
)
scheduler
.task(ActionDeliveryTask(action_token, recipient))
.with_retry(retry_policy)
.schedule()Retry Schedule:
Attempt 1: Immediate
↓ fails
Attempt 2: 10s later (min wait)
↓ fails
Attempt 3: 20s later (2x backoff)
↓ fails
Attempt 4: 40s later (2x backoff)
↓ fails
Attempt 5: 80s later (capped at max)
↓ fails
Attempt 6: Not attempted (max retries reached)
↓
Task marked as failed3. Cron Scheduling
Tasks can run on a schedule using cron expressions:
# Every day at 2:30 AM
scheduler
.task(CleanupTask(temp_dir))
.daily_at(2, 30)
.schedule()
# Every hour at :00
scheduler
.task(BackupTask(db_path))
.hourly_at(0)
.schedule()
# Every Monday at 9:00 AM
scheduler
.task(WeeklyReportTask())
.weekly_on(Weekday::Monday, 9, 0)
.schedule()
# 1st of every month at midnight
scheduler
.task(MonthlyTask())
.monthly_on(1, 0, 0)
.schedule()4. Persistence
Tasks are persisted to MetaAdapter and survive server restarts:
start_scheduler(app):
scheduler = Scheduler(app)
# Load unfinished tasks from database
pending_tasks = app.meta_adapter.list_pending_tasks()
for task_meta in pending_tasks:
# Rebuild task from serialized context
task = TaskRegistry.build(
task_meta.kind,
task_meta.id,
task_meta.context
)
# Re-queue task
scheduler.enqueue(task_meta.id, task)
# Start processing
scheduler.start()Built-in Task Types
| Task | Purpose | Location |
|---|---|---|
| ActionCreatorTask | Creates and signs action tokens (JWT/ES384) for federation | cloudillo-action/src/task.rs |
| ActionVerifierTask | Validates incoming federated action tokens (signature, permissions, attachments) | cloudillo-action/src/task.rs |
| ActionDeliveryTask | Delivers actions to remote instances with retry (POST to /api/inbox) |
cloudillo-action/src/task.rs |
| FileIdGeneratorTask | Computes SHA256 content-addressed file IDs, moves files to BlobAdapter | cloudillo-file/src/descriptor.rs |
| ImageResizerTask | Generates image variants (tn/sd/md/hd/xd) using Lanczos3 filter | cloudillo-file/src/image.rs |
| VideoTranscoderTask | Transcodes video to web formats via FFmpeg | cloudillo-file/src/video.rs |
| PdfProcessorTask | Extracts text, metadata, and page thumbnails from PDFs | cloudillo-file/src/pdf.rs |
| AudioExtractorTask | Extracts ID3 tags, duration, and waveform previews | cloudillo-file/src/audio.rs |
| EmailSenderTask | Sends emails asynchronously via SMTP with retry | cloudillo-email/src/task.rs |
| CertRenewalTask | Automatic TLS certificate renewal via ACME (daily check) | cloudillo-core/src/acme.rs |
| ProfileRefreshBatchTask | Batch-refreshes cached remote profile data | cloudillo-profile/src/sync.rs |
| TenantImageUpdaterTask | Processes and stores tenant avatar/banner images | cloudillo-profile/src/media.rs |
Builder Pattern API
The scheduler uses a fluent builder API for task configuration:
retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)
scheduler
.task(ActionDeliveryTask(token, recipient))
.key(format("deliver-{}-{}", action_id, recipient))
.depend_on([action_creator_task])
.with_retry(retry)
.priority(Priority::Medium)
.schedule()Integration with Worker Pool
CPU-intensive tasks should use the worker pool:
impl Task<App> for ImageResizerTask:
run(state):
# Load image (async I/O)
image_data = state.blob_adapter.read_blob_buf(
self.tn_id, self.source_file_id
)
# Resize image (CPU-intensive - use worker pool)
resized = state.worker.run(closure:
img = image.load_from_memory(image_data)
resized = img.resize(
self.target_width,
self.target_height,
FilterType::Lanczos3
)
# Encode to format
buffer = resized.encode_to(self.format)
return buffer
)
# Store variant (async I/O)
variant_id = compute_file_id(resized)
state.blob_adapter.create_blob_buf(
self.tn_id, variant_id, resized
)See Also
- Worker Pool - CPU-intensive task execution
- System Architecture - Overall system design
- Actions - Action token creation/verification tasks
- File Storage - File processing tasks