redb Implementation

The query-based RTDB uses redb, a lightweight embedded database, to provide Firebase-like functionality with minimal overhead. This approach is ideal for structured data, complex queries, and traditional database operations.

Why redb?

redb is chosen for its exceptional characteristics:

  • Tiny footprint: 171 KiB package size (vs. 1+ MB for most databases)
  • Pure Rust: Memory-safe, no unsafe code
  • ACID transactions: Full transactional guarantees
  • Zero-copy reads: Excellent performance
  • Embedded: No separate database server
  • LMDB-inspired: Proven B-tree architecture

Architecture

Layered Design

┌──────────────────────────────────────┐
│ Client Application (JavaScript)      │
│  - Query API                         │
│  - Subscriptions                     │
│  - Transactions                      │
└──────────────────────────────────────┘
                ↓ WebSocket
┌──────────────────────────────────────┐
│ WebSocket Handler                    │
│  - Message parsing                   │
│  - Authentication                    │
│  - Subscription tracking             │
└──────────────────────────────────────┘
                ↓
┌──────────────────────────────────────┐
│ RtdbAdapter Trait                    │
│  - query()                           │
│  - create()                          │
│  - update()                          │
│  - delete()                          │
│  - transaction()                     │
└──────────────────────────────────────┘
                ↓
┌──────────────────────────────────────┐
│ redb Implementation                  │
│  - Key-value storage                 │
│  - Secondary indexes                 │
│  - Query execution                   │
└──────────────────────────────────────┘
                ↓
┌──────────────────────────────────────┐
│ Real-Time Layer                      │
│  - tokio::broadcast channels         │
│  - Change event propagation          │
│  - Subscription filtering            │
└──────────────────────────────────────┘

RtdbAdapter Trait

The core interface for database operations:

#[async_trait]
pub trait RtdbAdapter: Send + Sync {
    /// Query documents
    async fn query(
        &self,
        db_id: &str,
        path: &str,
        options: QueryOptions,
    ) -> Result<Vec<Document>>;

    /// Create document
    async fn create(
        &self,
        db_id: &str,
        path: &str,
        data: Value,
    ) -> Result<String>;  // Returns document ID

    /// Update document
    async fn update(
        &self,
        db_id: &str,
        path: &str,
        doc_id: &str,
        data: Value,
    ) -> Result<()>;

    /// Delete document
    async fn delete(
        &self,
        db_id: &str,
        path: &str,
        doc_id: &str,
    ) -> Result<()>;

    /// Execute transaction
    async fn transaction(
        &self,
        db_id: &str,
        operations: Vec<Operation>,
    ) -> Result<TransactionResult>;

    /// Subscribe to changes
    fn subscribe(
        &self,
        db_id: &str,
        path: &str,
        filter: Option<QueryFilter>,
    ) -> broadcast::Receiver<ChangeEvent>;
}

Data Model

Collections and Documents

Data is organized into collections containing JSON documents:

database/
├── users/
│   ├── user_001
│   ├── user_002
│   └── ...
├── posts/
│   ├── post_abc
│   ├── post_def
│   └── ...
└── comments/
    └── ...

Document Structure

Documents are JSON objects with auto-generated IDs:

{
  "_id": "user_001",
  "_createdAt": 1738483200,
  "_updatedAt": 1738486800,
  "name": "Alice",
  "email": "alice@example.com",
  "age": 30,
  "active": true
}

System Fields (auto-managed):

  • _id: Unique document identifier
  • _createdAt: Creation timestamp (Unix)
  • _updatedAt: Last modification timestamp

Path Syntax

Paths use slash-separated segments:

users                    // Collection
users/user_001          // Specific document
posts/post_abc/comments // Sub-collection

Query System

QueryOptions

pub struct QueryOptions {
    pub filter: Option<QueryFilter>,
    pub sort: Option<Sort>,
    pub limit: Option<u32>,
    pub offset: Option<u32>,
}

pub struct Sort {
    pub field: String,
    pub direction: SortDirection,  // Asc | Desc
}

QueryFilter

Filters support various comparison operators:

pub enum QueryFilter {
    Equals { field: String, value: Value },
    NotEquals { field: String, value: Value },
    GreaterThan { field: String, value: Value },
    GreaterThanOrEqual { field: String, value: Value },
    LessThan { field: String, value: Value },
    LessThanOrEqual { field: String, value: Value },
    Contains { field: String, value: String },
    In { field: String, values: Vec<Value> },
    And(Vec<QueryFilter>),
    Or(Vec<QueryFilter>),
}

Query Examples

Simple query:

{
  "type": "query",
  "id": 1,
  "path": "users",
  "filter": {
    "equals": { "field": "active", "value": true }
  },
  "sort": { "field": "name", "direction": "asc" },
  "limit": 50
}

Complex query:

{
  "type": "query",
  "id": 2,
  "path": "posts",
  "filter": {
    "and": [
      { "equals": { "field": "published", "value": true } },
      { "greaterThan": { "field": "views", "value": 100 } }
    ]
  },
  "sort": { "field": "createdAt", "direction": "desc" },
  "limit": 20
}

WebSocket Protocol

Message Types

Client → Server

1. Query

{
  "type": "query",
  "id": 1,
  "path": "users",
  "filter": { "equals": { "field": "active", "value": true } },
  "limit": 50
}

2. Subscribe

{
  "type": "subscribe",
  "id": 2,
  "path": "posts",
  "filter": { "equals": { "field": "published", "value": true } }
}

3. Create

{
  "type": "create",
  "id": 3,
  "path": "users",
  "data": {
    "name": "Bob",
    "email": "bob@example.com",
    "age": 25
  }
}

4. Update

{
  "type": "update",
  "id": 4,
  "path": "users",
  "docId": "user_001",
  "data": {
    "age": 31,
    "lastLogin": 1738486800
  }
}

5. Delete

{
  "type": "delete",
  "id": 5,
  "path": "users",
  "docId": "user_001"
}

6. Transaction

{
  "type": "transaction",
  "id": 6,
  "operations": [
    {
      "type": "create",
      "path": "posts",
      "ref": "$post",
      "data": { "title": "Hello", "author": "alice" }
    },
    {
      "type": "update",
      "path": "users/alice",
      "data": { "postCount": { "$op": "increment", "by": 1 } }
    }
  ]
}

Server → Client

1. Query Result

{
  "type": "queryResult",
  "id": 1,
  "data": [
    { "_id": "user_001", "name": "Alice", "active": true },
    { "_id": "user_002", "name": "Bob", "active": true }
  ],
  "total": 2
}

2. Subscribe Result

{
  "type": "subscribeResult",
  "id": 2,
  "subscriptionId": "sub_abc123",
  "data": [
    { "_id": "post_001", "title": "Hello", "published": true }
  ]
}

3. Change Event

{
  "type": "change",
  "subscriptionId": "sub_abc123",
  "changes": [
    {
      "type": "added",
      "path": "posts",
      "docId": "post_002",
      "data": { "title": "New Post", "published": true }
    }
  ]
}

4. Create Result

{
  "type": "createResult",
  "id": 3,
  "docId": "user_003"
}

5. Error

{
  "type": "error",
  "id": 4,
  "code": "permission_denied",
  "message": "Insufficient permissions to update this document"
}

Real-Time Subscriptions

Subscription Flow

Client sends subscribe message
  ↓
Server validates permissions
  ↓
Server creates broadcast channel
  ↓
Server executes initial query
  ↓
Server sends subscribeResult with data
  ↓
Server watches for changes matching filter
  ↓
On change: Server sends change event
  ↓
Client updates local state

Implementation

Subscription Structure:

  • id: Unique subscription identifier
  • path: Collection path being subscribed to
  • filter: Optional query filter to match changes
  • sender: Broadcast channel for sending change events

Notification Algorithm:

Algorithm: Notify Subscribers on Change

Input: db_id, path, change_event
Output: None (side effect: sends to all matching subscribers)

1. For each active subscription:
   a. Check if subscription.path matches change path
   b. If path matches:
      - Evaluate if change data matches subscription filter
      - If matches: send change_event through subscriber's broadcast channel
      - If no match: skip this subscriber
2. Return

This ensures:
- Only subscribed collections receive notifications
- Filter conditions prevent unnecessary updates
- All matching subscribers notified in parallel via broadcast channels

Change Event Types

pub enum ChangeType {
    Added,    // New document created
    Modified, // Existing document updated
    Removed,  // Document deleted
}

pub struct ChangeEvent {
    pub change_type: ChangeType,
    pub path: String,
    pub doc_id: String,
    pub data: Option<Value>,  // None for Removed
}

Transactions

Atomic Operations

Transactions ensure multiple operations execute atomically:

{
  "type": "transaction",
  "id": 10,
  "operations": [
    {
      "type": "update",
      "path": "accounts/alice",
      "data": { "balance": { "$op": "increment", "by": -100 } }
    },
    {
      "type": "update",
      "path": "accounts/bob",
      "data": { "balance": { "$op": "increment", "by": 100 } }
    }
  ]
}

Guarantees:

  • All operations succeed or all fail
  • Intermediate states never visible
  • Sequential consistency

Temporary References

Reference documents created within the same transaction:

{
  "type": "transaction",
  "id": 11,
  "operations": [
    {
      "type": "create",
      "path": "posts",
      "ref": "$post",
      "data": { "title": "My Post", "author": "alice" }
    },
    {
      "type": "create",
      "path": "comments",
      "data": {
        "postId": { "$ref": "$post" },
        "text": "First comment!",
        "author": "alice"
      }
    }
  ]
}

How it works:

  1. First operation creates post, saves ID as $post
  2. Second operation references $post, replaced with actual ID
  3. Comment gets correct post ID even though it wasn’t known initially

Computed Values

Field Operations

Modify field values with special operations:

Increment:

{
  "views": { "$op": "increment", "by": 1 }
}

Append (to array):

{
  "tags": { "$op": "append", "value": "javascript" }
}

Remove (from array):

{
  "tags": { "$op": "remove", "value": "draft" }
}

Set if not exists:

{
  "createdAt": { "$op": "setIfNotExists", "value": 1738483200 }
}

Query Operations

Aggregate data within queries:

Count:

{
  "type": "query",
  "id": 12,
  "path": "posts",
  "query": { "$query": "count" },
  "filter": { "equals": { "field": "published", "value": true } }
}

Sum:

{
  "type": "query",
  "id": 13,
  "path": "orders",
  "query": { "$query": "sum", "field": "total" }
}

Average:

{
  "type": "query",
  "id": 14,
  "path": "reviews",
  "query": { "$query": "avg", "field": "rating" }
}

Function Operations

Server-side functions for computed values:

Now (current timestamp):

{
  "createdAt": { "$fn": "now" }
}

UUID (generate unique ID):

{
  "trackingId": { "$fn": "uuid" }
}

Slugify (URL-safe string):

{
  "slug": { "$fn": "slugify", "input": "Hello World!" }
  // Results in: "hello-world"
}

Hash (SHA256):

{
  "passwordHash": { "$fn": "hash", "input": "password123" }
}

Indexing

Secondary Indexes

Improve query performance:

Index Definition:

  • name: Unique name for the index (e.g., “idx_email”)
  • fields: Vector of field names to index (single or compound)
  • unique: Boolean flag ensuring no duplicate values (for unique constraints)

Create Index Algorithm:

Algorithm: Create Index

Input: db_id, collection_path, index_definition
Output: Result<()>

1. Validate index definition:
   - Check name uniqueness (not duplicate of existing index)
   - Verify all fields exist in collection schema
   - If unique=true: verify collection has no duplicate values for these fields

2. Build index structure:
   - Scan existing documents in collection
   - For each document, extract values for indexed fields
   - Build index data structure (B-tree for efficient lookups)

3. Store index metadata:
   - Save index definition in metadata adapter
   - Record index name, fields, and unique flag

4. Return success

This ensures:
- Efficient lookups on indexed fields
- Query optimizer can use index automatically
- Unique constraints enforced at index level

Index Usage

Queries automatically use indexes when available:

{
  "type": "query",
  "path": "users",
  "filter": { "equals": { "field": "email", "value": "alice@example.com" } }
}
// Uses idx_email if available, otherwise full scan

Index Strategies

Single-field indexes:

fields: vec!["email"]           // For email lookups
fields: vec!["createdAt"]       // For sorting by date

Compound indexes:

fields: vec!["category", "price"]  // For category + price queries

Unique indexes:

unique: true  // Ensures no duplicates (e.g., email, username)

Client SDK Example

JavaScript/TypeScript

import { CloudilloRtdb } from 'cloudillo-rtdb-client';

// Connect to database
const db = await CloudilloRtdb.connect(fileId, {
  authToken: accessToken,
  serverUrl: 'wss://cl-o.example.com'
});

// Query data
const users = await db.collection('users')
  .where('active', '==', true)
  .orderBy('name', 'asc')
  .limit(50)
  .get();

console.log(users.docs);

// Subscribe to changes
const unsubscribe = db.collection('posts')
  .where('published', '==', true)
  .onSnapshot((snapshot) => {
    snapshot.docChanges().forEach((change) => {
      if (change.type === 'added') {
        console.log('New post:', change.doc.data());
      }
      if (change.type === 'modified') {
        console.log('Modified post:', change.doc.data());
      }
      if (change.type === 'removed') {
        console.log('Removed post:', change.doc.id);
      }
    });
  });

// Create document
const docId = await db.collection('users').add({
  name: 'Charlie',
  email: 'charlie@example.com',
  age: 28
});

// Update document
await db.collection('users').doc(docId).update({
  age: 29,
  lastLogin: new Date()
});

// Transaction
await db.runTransaction(async (transaction) => {
  const postRef = db.collection('posts').doc();

  transaction.create(postRef, {
    title: 'My Post',
    author: 'alice',
    createdAt: Date.now()
  });

  transaction.update(db.collection('users').doc('alice'), {
    postCount: { $increment: 1 }
  });
});

// Cleanup
unsubscribe();
await db.close();

React Hook Example

import { useRtdbQuery, useRtdbSubscription } from 'cloudillo-react';

function TaskList() {
  // Query tasks
  const { data: tasks, loading, error } = useRtdbQuery(
    db.collection('tasks')
      .where('completed', '==', false)
      .orderBy('priority', 'desc')
  );

  // Subscribe to changes
  const { data: liveTasks } = useRtdbSubscription(
    db.collection('tasks').where('assignee', '==', userId)
  );

  if (loading) return <div>Loading...</div>;
  if (error) return <div>Error: {error.message}</div>;

  return (
    <ul>
      {tasks.map(task => (
        <li key={task._id}>{task.title}</li>
      ))}
    </ul>
  );
}

Performance Optimization

Query Optimization

Use indexes:

// Create index for frequently queried fields
await db.createIndex('users', { fields: ['email'], unique: true });

Limit results:

// Always use limit for large collections
const recent = await db.collection('posts')
  .orderBy('createdAt', 'desc')
  .limit(20)
  .get();

Use pagination:

const page1 = await db.collection('posts').limit(20).get();
const page2 = await db.collection('posts').limit(20).offset(20).get();

Subscription Optimization

Filter subscriptions:

// Only subscribe to relevant data
db.collection('messages')
  .where('conversationId', '==', conversationId)
  .onSnapshot(handler);  // Not all messages

Cleanup subscriptions:

// Always unsubscribe when component unmounts
useEffect(() => {
  const unsubscribe = db.collection('...').onSnapshot(handler);
  return () => unsubscribe();
}, []);

Memory Management

Database eviction:

pub struct EvictionPolicy {
    max_instances: usize,        // Max databases in memory
    idle_timeout: Duration,      // Evict after N minutes idle
    lru_eviction: bool,          // Use LRU when at max
}

Connection limits:

pub struct ConnectionLimits {
    max_sessions_per_db: usize,  // Per database
    max_total_sessions: usize,   // Server-wide
    max_sessions_per_user: usize,
}

Error Handling

Error Types

pub enum RtdbError {
    PermissionDenied,
    DocumentNotFound,
    InvalidQuery,
    TransactionFailed,
    DatabaseNotFound,
    ConnectionClosed,
    RateLimitExceeded,
}

Client Handling

try {
  await db.collection('users').doc(id).update(data);
} catch (error) {
  if (error.code === 'permission_denied') {
    console.error('No permission to update');
  } else if (error.code === 'document_not_found') {
    console.error('Document does not exist');
  } else {
    console.error('Unknown error:', error);
  }
}

Security Best Practices

Permission Rules

// Create database with strict permissions
await fetch('/api/db', {
  method: 'POST',
  body: JSON.stringify({
    name: 'Private Data',
    type: 'redb',
    permissions: {
      public_read: false,
      public_write: false,
      readers: [],  // Only owner can read
      writers: []   // Only owner can write
    }
  })
});

Input Validation

// Validate on client before sending
function validateUser(data) {
  if (!data.email || !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.email)) {
    throw new Error('Invalid email');
  }
  if (!data.name || data.name.length < 2) {
    throw new Error('Name too short');
  }
  return true;
}

if (validateUser(userData)) {
  await db.collection('users').add(userData);
}

Rate Limiting

Server-Side Rate Limits:

To prevent abuse and ensure fair resource allocation, the server enforces per-user/per-IP rate limits:

Query Limits:

  • MAX_QUERIES_PER_SECOND: 100 queries per user per second
    • Prevents overwhelming the database with excessive read queries
    • Typical usage: <10 queries/second for normal applications

Write Limits:

  • MAX_WRITES_PER_SECOND: 10 writes per user per second
    • More restrictive than reads to protect data consistency
    • Typical usage: <1 write/second for normal applications

Implementation Pattern:

  • Sliding window counter: Track requests in last 1 second
  • Per-user tracking: Rate limits apply to authenticated users
  • Per-IP fallback: Anonymous requests rate-limited by IP address
  • When exceeded: Return HTTP 429 (Too Many Requests) with retry-after header

See Also