Task Scheduler System

Cloudillo’s Task Scheduler is a sophisticated persistent task execution system that enables reliable, async background processing with dependency management, automatic retries, and cron-style scheduling. This system is critical for federation, file processing, and any operations that need to survive server restarts.

Why Task-Based Processing?

Traditional async tasks (like tokio::spawn) have significant limitations for production systems:

Problems with Simple Async:

  • Lost on restart: Server restarts lose all pending tasks
  • No retry logic: Failures require manual handling
  • No dependencies: Can’t wait for other tasks to complete
  • No scheduling: Can’t run tasks at specific times
  • No observability: Hard to track task progress/failures

Task Scheduler Solutions:

  • Persistent: Tasks survive server restarts via MetaAdapter
  • Automatic Retry: Exponential backoff with configurable limits
  • Dependency Tracking: Tasks wait for dependencies (DAG)
  • Cron Scheduling: Recurring tasks (daily, hourly, etc.)
  • Observable: Track task status, retries, and failures
  • Priority Queues: High/medium/low priority execution

Architecture Overview

Components

┌────────────────────────────────────────────┐
│ Task Scheduler                             │
│  ┌──────────────────────────────────────┐  │
│  │ Task Registry                        │  │
│  │  - TaskBuilder functions             │  │
│  │  - Task type mapping                 │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Task Queue                           │  │
│  │  - Pending tasks                     │  │
│  │  - Running tasks                     │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Dependency Tracker                   │  │
│  │  - DAG (Directed Acyclic Graph)      │  │
│  │  - Waiting tasks                     │  │
│  │  - Completion notifications          │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Retry Manager                        │  │
│  │  - Exponential backoff               │  │
│  │  - Retry counters                    │  │
│  │  - Backoff timers                    │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Cron Scheduler                       │  │
│  │  - Cron expressions                  │  │
│  │  - Next execution time               │  │
│  │  - Recurring task tracking           │  │
│  └──────────────────────────────────────┘  │
└────────────────────────────────────────────┘
                    ↓
┌────────────────────────────────────────────┐
│ MetaAdapter (Persistence)                  │
│  - Task metadata storage                   │
│  - Task status tracking                    │
│  - Dependency relationships                │
└────────────────────────────────────────────┘
                    ↓
┌────────────────────────────────────────────┐
│ Worker Pool (Execution)                    │
│  - High priority workers                   │
│  - Medium priority workers                 │
│  - Low priority workers                    │
└────────────────────────────────────────────┘

Execution Flow

1. Task Created
   ↓
2. Store in MetaAdapter (persistence)
   ↓
3. Check Dependencies / scheduling
   ├─ Has dependencies? → Wait
   └─ No dependencies? → Queue
   ↓
4. Add to Priority Queue
   ↓
5. Execute Task
   ├─ Success → Mark complete, notify dependents
   ├─ Failure (transient) → Schedule retry with backoff
   └─ Failure (permanent) → Mark failed, notify dependents
   ↓
6. Trigger Dependent Tasks

Task Trait System

All tasks implement the Task<S> trait:

#[async_trait]
pub trait Task<S: Clone>: Send + Sync + Debug {
    /// Task type identifier (e.g., "ActionCreator", "ImageResizer")
    fn kind() -> &'static str where Self: Sized;

    /// Build task from serialized context
    fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
    where Self: Sized;

    /// Serialize task for persistence
    fn serialize(&self) -> String;

    /// Execute the task
    async fn run(&self, state: &S) -> ClResult<()>;

    /// Get task type of this instance
    fn kind_of(&self) -> &'static str;
}

Generic State Parameter

The S parameter is the application state (typically Arc<AppState>):

// Task receives app state for accessing adapters
async fn run(&self, state: &Arc<AppState>) -> ClResult<()> {
    // Access adapters through state
    state.meta_adapter.create_action(...).await?;
    state.blob_adapter.create_blob(...).await?;
    Ok(())
}

Task Serialization

Tasks must serialize to strings for persistence:

MyTask.serialize():
    return JSON.stringify(self)

MyTask.build(id, context):
    task = JSON.parse(context)
    return Arc(task)

Key Features

1. Dependency Tracking

Tasks can depend on other tasks forming a Directed Acyclic Graph (DAG):

# Create file processing task
file_task_id = scheduler
    .task(FileIdGeneratorTask(temp_path))
    .schedule()

# Create image resizing task that depends on file task
resize_task_id = scheduler
    .task(ImageResizerTask(file_id, "hd"))
    .depend_on([file_task_id])    # Wait for file task
    .schedule()

# Create action that depends on both
action_task_id = scheduler
    .task(ActionCreatorTask(action_data))
    .depend_on([file_task_id, resize_task_id])  # Wait for both
    .schedule()

Dependency Resolution:

FileIdGeneratorTask (no dependencies)
  ↓ completes
ImageResizerTask (depends on file task)
  ↓ completes
ActionCreatorTask (depends on both)
  ↓ executes

Cycle Detection: The scheduler detects and rejects circular dependencies:

# This would be rejected
task_a.depend_on([task_b])
task_b.depend_on([task_a])  # Error: cycle detected

2. Exponential Backoff Retry

Tasks can automatically retry on failure with increasing delays:

retry_policy = RetryPolicy(
    min_wait: 10s
    max_wait: 3600s (1 hour)
    max_attempts: 5
)

scheduler
    .task(ActionDeliveryTask(action_token, recipient))
    .with_retry(retry_policy)
    .schedule()

Retry Schedule:

Attempt 1: Immediate
  ↓ fails
Attempt 2: 10s later (min wait)
  ↓ fails
Attempt 3: 20s later (2x backoff)
  ↓ fails
Attempt 4: 40s later (2x backoff)
  ↓ fails
Attempt 5: 80s later (capped at max)
  ↓ fails
Attempt 6: Not attempted (max retries reached)
  ↓
Task marked as failed

Retry Logic:

struct RetryPolicy {
    min_wait_secs: u64      # Minimum wait between retries
    max_wait_secs: u64      # Maximum wait between retries
    max_attempts: u32       # Maximum retry attempts
}

Wait calculation:

calculate_wait(attempt):
    wait = min_wait_secs * 2^(attempt - 1)
    wait = min(wait, max_wait_secs)
    return wait

3. Cron Scheduling

Tasks can run on a schedule using cron expressions:

# Every day at 2:30 AM
scheduler
    .task(CleanupTask(temp_dir))
    .daily_at(2, 30)
    .schedule()

# Every hour at :00
scheduler
    .task(BackupTask(db_path))
    .hourly_at(0)
    .schedule()

# Every Monday at 9:00 AM
scheduler
    .task(WeeklyReportTask())
    .weekly_on(Weekday::Monday, 9, 0)
    .schedule()

# 1st of every month at midnight
scheduler
    .task(MonthlyTask())
    .monthly_on(1, 0, 0)
    .schedule()

Cron Expression Format:

minute hour day month weekday

* * * * *  (every minute)
0 * * * *  (every hour at :00)
0 0 * * *  (every day at midnight)
0 0 * * 1  (every Monday at midnight)
0 0 1 * *  (1st of month at midnight)

Fields:

  • minute: 0-59
  • hour: 0-23
  • day: 1-31
  • month: 1-12
  • weekday: 0-6 (0 = Sunday)

4. Persistence

Tasks are persisted to MetaAdapter and survive server restarts:

start_scheduler(app):
    scheduler = Scheduler(app)

    # Load unfinished tasks from database
    pending_tasks = app.meta_adapter.list_pending_tasks()

    for task_meta in pending_tasks:
        # Rebuild task from serialized context
        task = TaskRegistry.build(
            task_meta.kind,
            task_meta.id,
            task_meta.context
        )

        # Re-queue task
        scheduler.enqueue(task_meta.id, task)

    # Start processing
    scheduler.start()

Task States:

TaskStatus:
    Pending     → Created, not yet run
    Completed   → Successfully finished
    Failed      → Permanently failed

Built-in Task Types

ActionCreatorTask

Creates and signs action tokens for federation.

Purpose: Generate cryptographically signed action tokens

Usage:

task = ActionCreatorTask {
    tn_id: alice_tn_id
    action_type: "POST"
    content: post_data
    attachments: ["f1~abc123"]
    audience: null
}

task_id = scheduler
    .task(task)
    .depend_on([file_task_id])  # Wait for file upload
    .schedule()

What it does:

  1. Loads private signing key from AuthAdapter
  2. Builds JWT claims (iss, iat, t, c, p, a, etc.)
  3. Signs JWT with ES384 (P384 key)
  4. Computes action ID (SHA256 hash of token)
  5. Stores action in MetaAdapter
  6. Returns action ID

Location: server/src/action/action.rs


ActionVerifierTask

Validates incoming federated action tokens.

Purpose: Verify action tokens from remote instances

Usage:

# Triggered when receiving action at /api/inbox
task = ActionVerifierTask {
    token: incoming_token
}

scheduler
    .task(task)
    .schedule()

What it does:

  1. Decodes JWT without verification
  2. Fetches issuer’s public key from their /api/me/keys
  3. Verifies ES384 signature
  4. Checks expiration (if present)
  5. Validates permissions (following/connected status)
  6. Downloads missing file attachments
  7. Stores verified action in MetaAdapter

Location: server/src/action/action.rs


ActionDeliveryTask

Delivers action tokens to remote instances with retry.

Purpose: Federate actions to other Cloudillo instances

Usage:

task = ActionDeliveryTask {
    action_token: signed_token
    recipient: "bob.example.com"
}

retry_policy = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)

scheduler
    .task(task)
    .with_retry(retry_policy)
    .schedule()

What it does:

  1. POSTs action token to https://cl-o.{recipient}/api/inbox
  2. On success: Mark as delivered
  3. On temporary failure: Schedule retry with exponential backoff
  4. On permanent failure: Log and mark as failed

Retry behavior:

  • Network errors → Retry
  • 5xx errors → Retry
  • 4xx errors → Don’t retry (permanent failure)
  • Timeout → Retry

Location: server/src/action/action.rs


FileIdGeneratorTask

Generates content-addressed file IDs using SHA256.

Purpose: Create immutable file IDs for blob storage

Usage:

task = FileIdGeneratorTask {
    tn_id: alice_tn_id
    temp_file_path: "/tmp/upload-xyz"
    original_filename: "photo.jpg"
}

task_id = scheduler
    .task(task)
    .schedule()

What it does:

  1. Opens file from temp path
  2. Computes SHA256 hash (streaming for large files)
  3. Encodes hash as base64url
  4. Formats as f1~{hash}
  5. Moves file to permanent BlobAdapter storage
  6. Stores file metadata in MetaAdapter

Location: server/src/file/file.rs


ImageResizerTask

Generates image variants (thumbnails, different sizes).

Purpose: Create optimized image variants for different use cases

Usage:

task = ImageResizerTask {
    tn_id: alice_tn_id
    source_file_id: "f1~abc123"
    variant: "hd"  # tn, sd, md, hd, xd
    target_width: 1920
    target_height: 1080
    format: "avif"
    quality: 85
}

scheduler
    .task(task)
    .depend_on([file_id_task])  # Wait for original upload
    .priority(Priority::Low)    # CPU-intensive, low priority
    .schedule()

What it does:

  1. Loads source image from BlobAdapter
  2. Resizes with Lanczos3 filter (high quality)
  3. Encodes to target format (AVIF/WebP/JPEG)
  4. Uses worker pool for CPU-intensive work
  5. Computes variant ID (SHA256)
  6. Stores variant in BlobAdapter
  7. Updates file metadata with variant info

Location: server/src/file/image.rs


Builder Pattern API

The scheduler uses a fluent builder API for task configuration:

Basic Usage

scheduler
    .task(MyTask(data))
    .schedule()

With Key (Idempotency)

scheduler
    .task(MyTask(data))
    .key("unique-key")  # For task identification
    .schedule()

If a task with the same key already exists, scheduling is skipped.

With Dependencies

scheduler
    .task(MyTask(data))
    .depend_on([task_id_1, task_id_2])
    .schedule()

With Retry Policy

retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)

scheduler
    .task(MyTask(data))
    .with_retry(retry)
    .schedule()

With Cron Schedule

# Daily at 2:30 AM
scheduler.task(MyTask(data)).daily_at(2, 30).schedule()

# Hourly at :00
scheduler.task(MyTask(data)).hourly_at(0).schedule()

# Weekly on Monday at 9:00 AM
scheduler.task(MyTask(data)).weekly_on(Weekday::Monday, 9, 0).schedule()

# Monthly on 1st at midnight
scheduler.task(MyTask(data)).monthly_on(1, 0, 0).schedule()

# Custom cron expression
scheduler.task(MyTask(data)).cron("0 0 * * 1").schedule()  # Monday midnight

Combining Multiple Options

retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)

scheduler
    .task(ActionDeliveryTask(token, recipient))
    .key(format("deliver-{}-{}", action_id, recipient))
    .depend_on([action_creator_task])
    .with_retry(retry)
    .priority(Priority::Medium)
    .schedule()

Creating Custom Task Types

Step 1: Define Task Struct

struct EmailTask {
    to: String
    subject: String
    body: String
}

impl EmailTask {
    new(to, subject, body):
        return EmailTask { to, subject, body }
}

Step 2: Implement Task Trait

impl Task<App> for EmailTask:
    kind() → "EmailTask"
    kind_of() → "EmailTask"
    serialize() → JSON.stringify(self)
    build(id, context) → JSON.parse(context)

    run(state):
        client = new HttpClient()
        client.post("https://api.emailservice.com/send")
              .json({"to": self.to, "subject": self.subject, "body": self.body})
              .send()
        log("Email sent to {}", self.to)

Step 3: Register Task Type

# In scheduler initialization
scheduler.register(EmailTask, "EmailTask", EmailTask.build)

Step 4: Use the Task

task = EmailTask(
    to: "user@example.com"
    subject: "Welcome!"
    body: "Welcome to Cloudillo!"
)

retry = RetryPolicy(min: 30s, max: 3600s, max_attempts: 3)

scheduler
    .task(task)
    .with_retry(retry)
    .schedule()

Integration with Worker Pool

CPU-intensive tasks should use the worker pool:

impl Task<App> for ImageResizerTask:
    run(state):
        # Load image (async I/O)
        image_data = state.blob_adapter.read_blob_buf(
            self.tn_id, self.source_file_id
        )

        # Resize image (CPU-intensive - use worker pool)
        resized = state.worker.run(closure:
            img = image.load_from_memory(image_data)
            resized = img.resize(
                self.target_width,
                self.target_height,
                FilterType::Lanczos3
            )
            # Encode to format
            buffer = resized.encode_to(self.format)
            return buffer
        )

        # Store variant (async I/O)
        variant_id = compute_file_id(resized)
        state.blob_adapter.create_blob_buf(
            self.tn_id, variant_id, resized
        )

When to use worker pool:

  • Image/video processing
  • Compression/decompression
  • Cryptographic operations
  • Complex computations
  • Any blocking I/O

When NOT to use worker pool:

  • Simple async I/O (database, network)
  • Quick operations (<1ms)

Examples

Example 1: Simple Task

# Create a simple cleanup task
task = TempFileCleanupTask {
    directory: "/tmp/uploads"
    older_than_hours: 24
}

scheduler.task(task).schedule()

Example 2: Task with Dependencies

# Upload file
file_task = FileIdGeneratorTask(temp_path)
file_task_id = scheduler.task(file_task).schedule()

# Generate thumbnails (depends on file upload)
thumb_task = ImageResizerTask(file_id, "tn")
scheduler
    .task(thumb_task)
    .depend_on([file_task_id])
    .schedule()

Example 3: Task with Retry

# Federation delivery with retry
task = ActionDeliveryTask {
    action_token: signed_token
    recipient: "remote.example.com"
}

retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)

scheduler
    .task(task)
    .with_retry(retry)
    .schedule()

Example 4: Scheduled Task

# Daily backup at 2:00 AM
task = BackupTask {
    source_dir: "/data"
    dest_dir: "/backups"
}

scheduler.task(task).daily_at(2, 0).schedule()

Example 5: Complex Workflow

# 1. Upload file
upload_task = FileIdGeneratorTask(temp_path)
upload_id = scheduler.task(upload_task).schedule()

# 2. Generate variants (parallel, all depend on upload)
variant_ids = []
for variant in ["tn", "sd", "md", "hd"]:
    task = ImageResizerTask(file_id, variant)
    id = scheduler
        .task(task)
        .depend_on([upload_id])
        .priority(Priority::Low)
        .schedule()
    variant_ids.push(id)

# 3. Create action (depends on all variants)
action_task = ActionCreatorTask(post_data)
scheduler
    .task(action_task)
    .depend_on(variant_ids)
    .schedule()

Monitoring & Debugging

Task Status Queries

status = scheduler.get_task_status(task_id)

switch status:
    case Pending: print("Task waiting to run")
    case Running: print("Task currently executing")
    case WaitingDeps: print("Task waiting for dependencies")
    case Retrying: print("Task failed, will retry")
    case Completed: print("Task finished successfully")
    case Failed: print("Task permanently failed")

List Tasks

# List all pending tasks
pending = scheduler.list_tasks(TaskStatus::Pending)

for task in pending:
    print("{}: {} ({})", task.id, task.kind, task.created_at)

Task Logs

# Enable task logging
RUST_LOG=cloudillo::scheduler=debug cargo run

# Logs show:
# [DEBUG] Scheduling task TaskId(123) of type ActionCreatorTask
# [DEBUG] Task TaskId(123) waiting for dependencies: [TaskId(122)]
# [INFO]  Task TaskId(122) completed successfully
# [DEBUG] Task TaskId(123) dependencies satisfied, queueing
# [INFO]  Executing task TaskId(123) (ActionCreatorTask)
# [INFO]  Task TaskId(123) completed in 42ms

Error Handling

Transient vs Permanent Failures

Tasks should distinguish between transient (retry) and permanent (fail) errors:

run(state):
    result = deliver_action(self.recipient)

    if result == Ok:
        return Ok

    # Transient errors - will retry
    if is_network_error(result):
        return error (will retry)
    if is_timeout(result):
        return error (will retry)
    if is_5xx_error(result):
        return error (will retry)

    # Permanent errors - won't retry
    if is_4xx_error(result):
        log_warn("Permanent delivery failure: {}", result)
        return PermanentFailure(result)

    return error

Handling Dependencies Failure

When a dependency fails, dependent tasks are notified:

# Task A fails
# Tasks B, C, D depend on A

# B can handle failure:
run(state):
    dep_error = check_dep_failures()

    if dep_error exists:
        # Use fallback
        return run_with_fallback(state)

    return run_normal(state)

# Or task can fail immediately when dependency fails

Performance Considerations

Task Granularity

Too Fine-Grained (Bad):

# Creating 1000 tiny tasks
for i in 0..1000:
    scheduler.task(ProcessItemTask(item_id: i)).schedule()
# Overhead: 1000 DB writes, 1000 queue operations

Appropriate Granularity (Good):

# One task processes batch
scheduler.task(ProcessBatchTask(item_ids: 0..1000)).schedule()
# Overhead: 1 DB write, 1 queue operation

Dependency Depth

Avoid deep dependency chains:

// Bad: Deep chain (slow to start)
A  B  C  D  E  F

// Good: Wide graph (parallel execution)
    ┌─ B
A ──┼─ C
    ├─ D
    └─ E  F

Priority Usage

Use priorities appropriately:

  • High: User-facing operations (<100/second)
  • Medium: Normal operations (default)
  • Low: Background tasks, cleanups

Too many high-priority tasks defeat the purpose.


Troubleshooting

Task Stuck in “Pending”

Possible causes:

  1. Waiting for dependencies

    deps = scheduler.get_task_dependencies(task_id)
    for dep in deps:
        print("Waiting for: {} ({})", dep.id, dep.status)
  2. Circular dependency (should be caught, but check)

    scheduler.check_for_cycles(task_id)
  3. Scheduler not running

    # Check if scheduler started
    scheduler.is_running()

Task Keeps Retrying

Causes:

  • Transient error not resolving
  • Wrong error classification (should be permanent)
  • Retry policy too aggressive

Solutions:

# 1. Check retry count
retry_count = scheduler.get_retry_count(task_id)

# 2. Check last error
last_error = scheduler.get_last_error(task_id)

# 3. Cancel task if stuck
scheduler.cancel_task(task_id)

# 4. Adjust retry policy
new_retry = RetryPolicy(min: 60s, max: 3600s, max_attempts: 3)

High Memory Usage

Causes:

  • Too many tasks in memory
  • Large task payloads
  • Not cleaning completed tasks

Solutions:

# 1. Limit concurrent tasks
MAX_CONCURRENT_TASKS = 100

# 2. Clean old completed tasks
scheduler.cleanup_completed_tasks(older_than_hours: 24)

# 3. Use pagination for large data
# Instead of embedding 10MB in task:
ProcessFileTask:
    file_id: String  # Reference, not data

Best Practices

1. Make Tasks Idempotent

Tasks may run multiple times (retries, restarts):

# Bad: Not idempotent
run(state):
    state.counter.increment()  # Runs twice = double increment

# Good: Idempotent
run(state):
    state.counter.set(42)  # Runs twice = same result

# Good: Check before acting
run(state):
    if NOT state.already_processed(self.id):
        state.counter.increment()
        state.mark_processed(self.id)

2. Use Task Keys for Deduplication

# Prevent duplicate tasks
key = format("deliver-{}-{}", action_id, recipient)

scheduler
    .task(ActionDeliveryTask(token, recipient))
    .key(key)  # Won't schedule if already exists
    .schedule()

3. Keep Task Payloads Small

# Bad: Large payload stored in DB
ProcessDataTask:
    data: Vec<u8>  # 10 MB!

# Good: Reference to data
ProcessDataTask:
    file_id: String  # Load from blob when needed

run(state):
    data = state.blob_adapter.read_blob(self.file_id)
    # Process data...

4. Clean Up Temporary Data

run(state):
    # Create temp file
    temp_path = create_temp_file()

    # Use temp file
    result = process_file(temp_path)

    # Always clean up (even on error)
    remove_file(temp_path)  # Safe even if fails

    return result

5. Log Progress for Long Tasks

run(state):
    items = load_items()
    total = items.length

    for (i, item) in enumerate(items):
        process_item(item)

        if i % 100 == 0:
            log("Progress: {}/{} ({}%)", i, total, i * 100 / total)

See Also

  • Worker Pool - CPU-intensive task execution
  • System Architecture - Overall system design
  • [Actions](/architecture/actions-federation/actions - Action token creation/verification tasks
  • File Storage - File processing tasks