redb Implementation

The query-based RTDB uses redb, a lightweight embedded database, to provide Firebase-like functionality with minimal overhead. This approach is ideal for structured data, complex queries, and traditional database operations.

Cloudillo uses redb, a lightweight pure-Rust embedded database with ACID transactions and zero-copy reads.

Architecture

Layered Design

┌──────────────────────────────────────┐
│ Client Application (JavaScript)      │
│  - Query API                         │
│  - Subscriptions                     │
│  - Transactions                      │
└──────────────────────────────────────┘
                ↓ WebSocket
┌──────────────────────────────────────┐
│ WebSocket Handler                    │
│  - Message parsing                   │
│  - Authentication                    │
│  - Subscription tracking             │
└──────────────────────────────────────┘
                ↓
┌──────────────────────────────────────┐
│ RtdbAdapter Trait                    │
│  - transaction() → Transaction       │
│  - query(), get()                    │
│  - subscribe()                       │
│  - acquire_lock(), release_lock()    │
│  - create_index(), stats()           │
└──────────────────────────────────────┘
                ↓
┌──────────────────────────────────────┐
│ redb Implementation                  │
│  - Key-value storage                 │
│  - Secondary indexes                 │
│  - Query execution                   │
└──────────────────────────────────────┘
                ↓
┌──────────────────────────────────────┐
│ Real-Time Layer                      │
│  - tokio::broadcast channels         │
│  - Change event propagation          │
│  - Subscription filtering            │
└──────────────────────────────────────┘

RtdbAdapter Trait

The core interface for database operations. All methods are tenant-aware (tn_id parameter). Write operations go through the separate Transaction trait.

#[async_trait]
pub trait RtdbAdapter: Debug + Send + Sync {
    /// Begin a new transaction for write operations
    async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;

    /// Close a database instance, flushing pending changes
    async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;

    /// Query documents with optional filtering, sorting, and pagination
    async fn query(&self, tn_id: TnId, db_id: &str, path: &str, opts: QueryOptions)
        -> ClResult<Vec<Value>>;

    /// Get a single document at a specific path
    async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;

    /// Subscribe to real-time changes (returns a stream of ChangeEvents)
    async fn subscribe(&self, tn_id: TnId, db_id: &str, opts: SubscriptionOptions)
        -> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;

    /// Create an index on a field for query performance
    async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
        -> ClResult<()>;

    /// Get database statistics (size, record count, table count)
    async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;

    /// Export all documents from a database
    async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;

    /// Acquire a lock on a document path
    async fn acquire_lock(&self, tn_id: TnId, db_id: &str, path: &str,
        user_id: &str, mode: LockMode, conn_id: &str) -> ClResult<Option<LockInfo>>;

    /// Release a lock on a document path
    async fn release_lock(&self, tn_id: TnId, db_id: &str, path: &str,
        user_id: &str, conn_id: &str) -> ClResult<()>;

    /// Check if a path has an active lock
    async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str)
        -> ClResult<Option<LockInfo>>;

    /// Release all locks held by a specific user (on disconnect)
    async fn release_all_locks(&self, tn_id: TnId, db_id: &str,
        user_id: &str, conn_id: &str) -> ClResult<()>;
}

Transaction Trait

All write operations (create, update, delete) are performed within a transaction:

#[async_trait]
pub trait Transaction: Send + Sync {
    /// Create a new document with auto-generated ID
    async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;

    /// Update an existing document (full replacement)
    async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;

    /// Delete a document at a path
    async fn delete(&mut self, path: &str) -> ClResult<()>;

    /// Read a document (with read-your-own-writes semantics)
    async fn get(&self, path: &str) -> ClResult<Option<Value>>;

    /// Commit all changes atomically
    async fn commit(&mut self) -> ClResult<()>;

    /// Rollback all changes
    async fn rollback(&mut self) -> ClResult<()>;
}

Data Model

Collections and Documents

Data is organized into collections containing JSON documents:

database/
├── users/
│   ├── user_001
│   ├── user_002
│   └── ...
├── posts/
│   ├── post_abc
│   ├── post_def
│   └── ...
└── comments/
    └── ...

Document Structure

Documents are JSON objects with auto-generated IDs:

{
  "_id": "user_001",
  "_createdAt": 1738483200,
  "_updatedAt": 1738486800,
  "name": "Alice",
  "email": "alice@example.com",
  "age": 30,
  "active": true
}

System Fields (auto-managed):

  • _id: Unique document identifier
  • _createdAt: Creation timestamp (Unix)
  • _updatedAt: Last modification timestamp

Path Syntax

Paths use slash-separated segments:

users                    // Collection
users/user_001          // Specific document
posts/post_abc/comments // Sub-collection

Query System

QueryOptions

pub struct QueryOptions {
    pub filter: Option<QueryFilter>,
    pub sort: Option<Vec<SortField>>,  // Multiple sort fields supported
    pub limit: Option<u32>,
    pub offset: Option<u32>,
    pub aggregate: Option<AggregateOptions>,
}

pub struct SortField {
    pub field: String,
    pub ascending: bool,  // true for ascending, false for descending
}

QueryFilter

QueryFilter is a flat struct (not an enum) where each field is a HashMap<String, Value>. Multiple conditions within the struct are ANDed implicitly — a document must satisfy all specified constraints. All field names use camelCase serialization.

#[serde(rename_all = "camelCase")]
pub struct QueryFilter {
    pub equals: HashMap<String, Value>,
    pub not_equals: HashMap<String, Value>,
    pub greater_than: HashMap<String, Value>,
    pub greater_than_or_equal: HashMap<String, Value>,
    pub less_than: HashMap<String, Value>,
    pub less_than_or_equal: HashMap<String, Value>,
    pub in_array: HashMap<String, Vec<Value>>,
    pub array_contains: HashMap<String, Value>,
    pub not_in_array: HashMap<String, Vec<Value>>,
    pub array_contains_any: HashMap<String, Vec<Value>>,
    pub array_contains_all: HashMap<String, Vec<Value>>,
}
Info

There are no And/Or combinators — multiple conditions are ANDed implicitly. Each HashMap maps field names to expected values.

Query Examples

Simple query:

{
  "type": "query",
  "id": 1,
  "path": "users",
  "filter": {
    "equals": { "active": true }
  },
  "sort": [{ "field": "name", "ascending": true }],
  "limit": 50
}

Complex query (multiple conditions are ANDed):

{
  "type": "query",
  "id": 2,
  "path": "posts",
  "filter": {
    "equals": { "published": true },
    "greaterThan": { "views": 100 }
  },
  "sort": [{ "field": "createdAt", "ascending": false }],
  "limit": 20
}

WebSocket Protocol

Message Types

Client → Server

1. Query

{
  "type": "query",
  "id": 1,
  "path": "users",
  "filter": { "equals": { "active": true } },
  "limit": 50
}

2. Subscribe

{
  "type": "subscribe",
  "id": 2,
  "path": "posts",
  "filter": { "equals": { "published": true } }
}

3. Get (single document)

{
  "type": "get",
  "id": 3,
  "path": "users/user_001"
}

4. Transaction (wraps create/update/delete operations)

{
  "type": "transaction",
  "id": 4,
  "operations": [
    {
      "type": "create",
      "path": "posts",
      "ref": "$post",
      "data": { "title": "Hello", "author": "alice" }
    },
    {
      "type": "update",
      "path": "users/alice",
      "data": { "postCount": { "$op": "increment", "by": 1 } }
    },
    {
      "type": "replace",
      "path": "users/alice",
      "data": { "name": "Alice", "role": "admin" }
    },
    {
      "type": "delete",
      "path": "posts/post_old"
    }
  ]
}
Info

All write operations (create, update, replace, delete) must be wrapped in a transaction message. There are no standalone write message types. The update operation merges fields into the existing document, while replace does a full document replacement.

5. Lock

{
  "type": "lock",
  "id": 5,
  "path": "todos/task_123",
  "mode": "hard"
}

6. Unlock

{
  "type": "unlock",
  "id": 6,
  "path": "todos/task_123"
}

7. Create Index

{
  "type": "createIndex",
  "id": 7,
  "path": "users",
  "field": "email"
}

8. Ping

{
  "type": "ping",
  "id": 8
}

Server → Client

1. Query Result

{
  "type": "queryResult",
  "id": 1,
  "data": [
    { "_id": "user_001", "name": "Alice", "active": true },
    { "_id": "user_002", "name": "Bob", "active": true }
  ],
  "total": 2
}

2. Get Result

{
  "type": "getResult",
  "id": 3,
  "data": { "_id": "user_001", "name": "Alice", "active": true }
}

3. Subscribe Result

{
  "type": "subscribeResult",
  "id": 2,
  "subscriptionId": "sub_abc123",
  "data": [
    { "_id": "post_001", "title": "Hello", "published": true }
  ]
}

4. Change Event

Change events use a single event object with an action field, not a changes array:

{
  "type": "change",
  "subscriptionId": "sub_abc123",
  "event": {
    "action": "create",
    "path": "posts",
    "data": { "_id": "post_002", "title": "New Post", "published": true }
  }
}

Possible action values: create, update, delete, lock, unlock, ready

The ready action is sent after the initial subscription data has been delivered.

5. Transaction Result

{
  "type": "transactionResult",
  "id": 4,
  "results": [
    { "ref": "$post", "id": "post_new_001" },
    { "id": "users/alice" },
    { "id": "posts/post_old" }
  ]
}

6. Lock Result

{
  "type": "lockResult",
  "id": 5,
  "locked": true
}

If the lock is denied:

{
  "type": "lockResult",
  "id": 5,
  "locked": false,
  "holder": "bob@example.com",
  "mode": "hard"
}

7. Error

{
  "type": "error",
  "id": 4,
  "code": "permission_denied",
  "message": "Insufficient permissions to update this document"
}

8. Pong

{
  "type": "pong",
  "id": 8
}

Real-Time Subscriptions

Subscription Flow

Client sends subscribe message
  ↓
Server validates permissions
  ↓
Server creates broadcast channel
  ↓
Server executes initial query
  ↓
Server sends subscribeResult with data
  ↓
Server watches for changes matching filter
  ↓
On change: Server sends change event
  ↓
Client updates local state

Implementation

Subscription Structure:

  • id: Unique subscription identifier
  • path: Collection path being subscribed to
  • filter: Optional query filter to match changes
  • sender: Broadcast channel for sending change events

Change Event Types

ChangeEvent is a tagged enum with #[serde(tag = "action")] serialization:

#[serde(tag = "action", rename_all = "camelCase")]
pub enum ChangeEvent {
    Create { path: Box<str>, data: Value },
    Update { path: Box<str>, data: Value, old_data: Option<Value> },
    Delete { path: Box<str>, old_data: Option<Value> },
    Lock   { path: Box<str>, data: Value },
    Unlock { path: Box<str>, data: Value },
    Ready  { path: Box<str>, data: Option<Value> },
}

This serializes as {"action": "create", "path": "...", "data": {...}} — the action field determines the variant.

Transactions

Atomic Operations

Transactions ensure multiple operations execute atomically:

{
  "type": "transaction",
  "id": 10,
  "operations": [
    {
      "type": "update",
      "path": "accounts/alice",
      "data": { "balance": { "$op": "increment", "by": -100 } }
    },
    {
      "type": "update",
      "path": "accounts/bob",
      "data": { "balance": { "$op": "increment", "by": 100 } }
    }
  ]
}

Guarantees:

  • All operations succeed or all fail
  • Intermediate states never visible
  • Sequential consistency

Temporary References

Reference documents created within the same transaction:

{
  "type": "transaction",
  "id": 11,
  "operations": [
    {
      "type": "create",
      "path": "posts",
      "ref": "$post",
      "data": { "title": "My Post", "author": "alice" }
    },
    {
      "type": "create",
      "path": "comments",
      "data": {
        "postId": { "$ref": "$post" },
        "text": "First comment!",
        "author": "alice"
      }
    }
  ]
}

How it works:

  1. First operation creates post, saves ID as $post
  2. Second operation references $post, replaced with actual ID
  3. Comment gets correct post ID even though it wasn’t known initially

Document Locking

The RTDB supports document-level locking for exclusive or advisory editing access.

Lock Modes

  • Soft lock (advisory): Other clients can still write but receive a notification that the document is locked. Useful for signaling editing intent.
  • Hard lock (enforced): The server rejects writes from other clients while the lock is held. Only the lock holder (identified by conn_id) can modify the document.

Lock/Unlock Messages

Client → Server:

{ "type": "lock", "id": 1, "path": "todos/task_123", "mode": "soft" }
{ "type": "unlock", "id": 2, "path": "todos/task_123" }

Server → Client:

{ "type": "lockResult", "id": 1, "locked": true }
{ "type": "lockResult", "id": 1, "locked": false, "holder": "bob@example.com", "mode": "hard" }

TTL-Based Expiration

Locks expire automatically after a TTL (time-to-live) period. This prevents permanently locked documents when clients disconnect unexpectedly or crash without releasing their locks. The server cleans up expired locks during its periodic maintenance cycle.

Connection-Based Echo Suppression

The server tracks lock ownership by conn_id. When a lock change event is broadcast to subscribers, the originating connection is excluded from the notification (echo suppression), similar to how write operations suppress echoes. This prevents the client that acquired the lock from receiving its own lock notification.

Lock Status in Change Events

Active subscriptions receive lock/unlock events as part of the change stream:

{
  "type": "change",
  "subscriptionId": "sub_abc123",
  "event": {
    "action": "lock",
    "path": "todos/task_123",
    "data": { "holder": "alice@example.com", "mode": "hard" }
  }
}

Aggregate Queries

The RTDB supports server-side aggregate computations on query results.

Aggregate Request

Add the aggregate option to a query or subscribe message:

{
  "type": "query",
  "id": 1,
  "path": "tasks",
  "filter": { "equals": { "completed": false } },
  "aggregate": {
    "groupBy": "status",
    "ops": [
      { "op": "sum", "field": "hours" },
      { "op": "avg", "field": "hours" }
    ]
  }
}

Aggregate Operations

Operation Description
sum Sum of a numeric field
avg Average of a numeric field
min Minimum value of a field
max Maximum value of a field

Each group always includes a count of matching documents.

Aggregate Response

{
  "type": "queryResult",
  "id": 1,
  "aggregate": {
    "groups": [
      { "group": "todo", "count": 12, "sum_hours": 36, "avg_hours": 3.0 },
      { "group": "in_progress", "count": 5, "sum_hours": 20, "avg_hours": 4.0 }
    ]
  }
}

Incremental Aggregate Subscriptions

When aggregate is used with a subscribe message, the server computes aggregates incrementally. On each change event that affects the subscribed path and filter, the server recalculates the affected groups and sends an updated aggregate snapshot rather than the full document set. This keeps aggregate subscriptions efficient even for large collections.

Computed Values

Field Operations

Modify field values with special operations:

Increment:

{
  "views": { "$op": "increment", "by": 1 }
}

Append (to array):

{
  "tags": { "$op": "append", "value": "javascript" }
}

Remove (from array):

{
  "tags": { "$op": "remove", "value": "draft" }
}

Other field operations: decrement, multiply, concat, min, max, setIfNotExists — all use the {"$op": "operation_name", ...} format.

Query Operations

Aggregate data within queries using $query:

Count:

{
  "type": "query",
  "id": 12,
  "path": "posts",
  "query": { "$query": "count" },
  "filter": { "equals": { "published": true } }
}

The sum and avg operations follow the same pattern with an additional "field" parameter (e.g., { "$query": "sum", "field": "total" }).

Function Operations

Server-side functions for computed values:

Now (current timestamp):

{
  "createdAt": { "$fn": "now" }
}

Slugify (URL-safe string):

{
  "slug": { "$fn": "slugify", "input": "Hello World!" }
  // Results in: "hello-world"
}

Other function operations: lowercase, hash (SHA256) — all use the {"$fn": "function_name", "input": "..."} format.

Indexing

Secondary Indexes

Improve query performance:

Index Definition:

  • name: Unique name for the index (e.g., “idx_email”)
  • fields: Vector of field names to index (single or compound)
  • unique: Boolean flag ensuring no duplicate values (for unique constraints)

Index Usage

Queries automatically use indexes when available:

{
  "type": "query",
  "path": "users",
  "filter": { "equals": { "email": "alice@example.com" } }
}
// Uses idx_email if available, otherwise full scan

Index Strategies

Single-field indexes:

fields: vec!["email"]           // For email lookups
fields: vec!["createdAt"]       // For sorting by date

Compound indexes:

fields: vec!["category", "price"]  // For category + price queries

Unique indexes:

unique: true  // Ensures no duplicates (e.g., email, username)

Client SDK

See the @cloudillo/rtdb Client SDK documentation for JavaScript and React integration examples.

See Also