Task Scheduler System

Cloudillo’s Task Scheduler is a sophisticated persistent task execution system that enables reliable, async background processing with dependency management, automatic retries, and cron-style scheduling. This system is critical for federation, file processing, and any operations that need to survive server restarts.

Why Task-Based Processing?

Simple async handlers cannot provide the guarantees needed for federation and file processing:

Problems with simple async:

  • ✗ Lost on server restart (no persistence)
  • ✗ No dependency ordering (file must finish before action)
  • ✗ No automatic retry on transient failures
  • ✗ No progress tracking or observability
  • ✗ No priority management for resource allocation

Task scheduler solutions:

  • ✅ Tasks survive server restarts (persisted via MetaAdapter)
  • ✅ Dependency resolution (DAG-based ordering)
  • ✅ Automatic retry with exponential backoff
  • ✅ Task lifecycle tracking (pending → running → complete/failed)
  • ✅ Priority-based execution via worker pool

Architecture Overview

Components

┌────────────────────────────────────────────┐
│ Task Scheduler                             │
│  ┌──────────────────────────────────────┐  │
│  │ Task Registry                        │  │
│  │  - TaskBuilder functions             │  │
│  │  - Task type mapping                 │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Task Queue                           │  │
│  │  - Pending tasks                     │  │
│  │  - Running tasks                     │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Dependency Tracker                   │  │
│  │  - DAG (Directed Acyclic Graph)      │  │
│  │  - Waiting tasks                     │  │
│  │  - Completion notifications          │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Retry Manager                        │  │
│  │  - Exponential backoff               │  │
│  │  - Retry counters                    │  │
│  │  - Backoff timers                    │  │
│  └──────────────────────────────────────┘  │
│  ┌──────────────────────────────────────┐  │
│  │ Cron Scheduler                       │  │
│  │  - Cron expressions                  │  │
│  │  - Next execution time               │  │
│  │  - Recurring task tracking           │  │
│  └──────────────────────────────────────┘  │
└────────────────────────────────────────────┘
                    ↓
┌────────────────────────────────────────────┐
│ MetaAdapter (Persistence)                  │
│  - Task metadata storage                   │
│  - Task status tracking                    │
│  - Dependency relationships                │
└────────────────────────────────────────────┘
                    ↓
┌────────────────────────────────────────────┐
│ Worker Pool (Execution)                    │
│  - High priority workers                   │
│  - Medium priority workers                 │
│  - Low priority workers                    │
└────────────────────────────────────────────┘

Task Trait System

All tasks implement the Task<S> trait:

#[async_trait]
pub trait Task<S: Clone>: Send + Sync + Debug {
    /// Task type identifier (e.g., "ActionCreator", "ImageResizer")
    fn kind() -> &'static str where Self: Sized;

    /// Build task from serialized context
    fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
    where Self: Sized;

    /// Serialize task for persistence
    fn serialize(&self) -> String;

    /// Execute the task
    async fn run(&self, state: &S) -> ClResult<()>;

    /// Get task type of this instance
    fn kind_of(&self) -> &'static str;
}

Generic State Parameter

The S parameter is the application state (typically Arc<AppState>):

// Task receives app state for accessing adapters
async fn run(&self, state: &Arc<AppState>) -> ClResult<()> {
    // Access adapters through state
    state.meta_adapter.create_action(...).await?;
    state.blob_adapter.create_blob(...).await?;
    Ok(())
}

Task Serialization

Tasks must serialize to strings for persistence:

MyTask.serialize():
    return JSON.stringify(self)

MyTask.build(id, context):
    task = JSON.parse(context)
    return Arc(task)

Key Features

1. Dependency Tracking

Tasks can depend on other tasks forming a Directed Acyclic Graph (DAG):

# Create file processing task
file_task_id = scheduler
    .task(FileIdGeneratorTask(temp_path))
    .schedule()

# Create image resizing task that depends on file task
resize_task_id = scheduler
    .task(ImageResizerTask(file_id, "hd"))
    .depend_on([file_task_id])    # Wait for file task
    .schedule()

# Create action that depends on both
action_task_id = scheduler
    .task(ActionCreatorTask(action_data))
    .depend_on([file_task_id, resize_task_id])  # Wait for both
    .schedule()

Dependency Resolution:

FileIdGeneratorTask (no dependencies)
  ↓ completes
ImageResizerTask (depends on file task)
  ↓ completes
ActionCreatorTask (depends on both)
  ↓ executes

2. Exponential Backoff Retry

Tasks can automatically retry on failure with increasing delays:

retry_policy = RetryPolicy(
    min_wait: 10s
    max_wait: 3600s (1 hour)
    max_attempts: 5
)

scheduler
    .task(ActionDeliveryTask(action_token, recipient))
    .with_retry(retry_policy)
    .schedule()

Retry Schedule:

Attempt 1: Immediate
  ↓ fails
Attempt 2: 10s later (min wait)
  ↓ fails
Attempt 3: 20s later (2x backoff)
  ↓ fails
Attempt 4: 40s later (2x backoff)
  ↓ fails
Attempt 5: 80s later (capped at max)
  ↓ fails
Attempt 6: Not attempted (max retries reached)
  ↓
Task marked as failed

3. Cron Scheduling

Tasks can run on a schedule using cron expressions:

# Every day at 2:30 AM
scheduler
    .task(CleanupTask(temp_dir))
    .daily_at(2, 30)
    .schedule()

# Every hour at :00
scheduler
    .task(BackupTask(db_path))
    .hourly_at(0)
    .schedule()

# Every Monday at 9:00 AM
scheduler
    .task(WeeklyReportTask())
    .weekly_on(Weekday::Monday, 9, 0)
    .schedule()

# 1st of every month at midnight
scheduler
    .task(MonthlyTask())
    .monthly_on(1, 0, 0)
    .schedule()

4. Persistence

Tasks are persisted to MetaAdapter and survive server restarts:

start_scheduler(app):
    scheduler = Scheduler(app)

    # Load unfinished tasks from database
    pending_tasks = app.meta_adapter.list_pending_tasks()

    for task_meta in pending_tasks:
        # Rebuild task from serialized context
        task = TaskRegistry.build(
            task_meta.kind,
            task_meta.id,
            task_meta.context
        )

        # Re-queue task
        scheduler.enqueue(task_meta.id, task)

    # Start processing
    scheduler.start()

Built-in Task Types

Task Purpose Location
ActionCreatorTask Creates and signs action tokens (JWT/ES384) for federation cloudillo-action/src/task.rs
ActionVerifierTask Validates incoming federated action tokens (signature, permissions, attachments) cloudillo-action/src/task.rs
ActionDeliveryTask Delivers actions to remote instances with retry (POST to /api/inbox) cloudillo-action/src/task.rs
FileIdGeneratorTask Computes SHA256 content-addressed file IDs, moves files to BlobAdapter cloudillo-file/src/descriptor.rs
ImageResizerTask Generates image variants (tn/sd/md/hd/xd) using Lanczos3 filter cloudillo-file/src/image.rs
VideoTranscoderTask Transcodes video to web formats via FFmpeg cloudillo-file/src/video.rs
PdfProcessorTask Extracts text, metadata, and page thumbnails from PDFs cloudillo-file/src/pdf.rs
AudioExtractorTask Extracts ID3 tags, duration, and waveform previews cloudillo-file/src/audio.rs
EmailSenderTask Sends emails asynchronously via SMTP with retry cloudillo-email/src/task.rs
CertRenewalTask Automatic TLS certificate renewal via ACME (daily check) cloudillo-core/src/acme.rs
ProfileRefreshBatchTask Batch-refreshes cached remote profile data cloudillo-profile/src/sync.rs
TenantImageUpdaterTask Processes and stores tenant avatar/banner images cloudillo-profile/src/media.rs

Builder Pattern API

The scheduler uses a fluent builder API for task configuration:

retry = RetryPolicy(min: 10s, max: 3600s, max_attempts: 5)

scheduler
    .task(ActionDeliveryTask(token, recipient))
    .key(format("deliver-{}-{}", action_id, recipient))
    .depend_on([action_creator_task])
    .with_retry(retry)
    .priority(Priority::Medium)
    .schedule()

Integration with Worker Pool

CPU-intensive tasks should use the worker pool:

impl Task<App> for ImageResizerTask:
    run(state):
        # Load image (async I/O)
        image_data = state.blob_adapter.read_blob_buf(
            self.tn_id, self.source_file_id
        )

        # Resize image (CPU-intensive - use worker pool)
        resized = state.worker.run(closure:
            img = image.load_from_memory(image_data)
            resized = img.resize(
                self.target_width,
                self.target_height,
                FilterType::Lanczos3
            )
            # Encode to format
            buffer = resized.encode_to(self.format)
            return buffer
        )

        # Store variant (async I/O)
        variant_id = compute_file_id(resized)
        state.blob_adapter.create_blob_buf(
            self.tn_id, variant_id, resized
        )

See Also