redb Implementation
The query-based RTDB uses redb, a lightweight embedded database, to provide Firebase-like functionality with minimal overhead. This approach is ideal for structured data, complex queries, and traditional database operations.
Why redb?
redb is chosen for its exceptional characteristics:
- Tiny footprint: 171 KiB package size (vs. 1+ MB for most databases)
- Pure Rust: Memory-safe, no unsafe code
- ACID transactions: Full transactional guarantees
- Zero-copy reads: Excellent performance
- Embedded: No separate database server
- LMDB-inspired: Proven B-tree architecture
Architecture
Layered Design
┌──────────────────────────────────────┐
│ Client Application (JavaScript) │
│ - Query API │
│ - Subscriptions │
│ - Transactions │
└──────────────────────────────────────┘
↓ WebSocket
┌──────────────────────────────────────┐
│ WebSocket Handler │
│ - Message parsing │
│ - Authentication │
│ - Subscription tracking │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ RtdbAdapter Trait │
│ - transaction() → Transaction │
│ - query(), get() │
│ - subscribe() │
│ - acquire_lock(), release_lock() │
│ - create_index(), stats() │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ redb Implementation │
│ - Key-value storage │
│ - Secondary indexes │
│ - Query execution │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ Real-Time Layer │
│ - tokio::broadcast channels │
│ - Change event propagation │
│ - Subscription filtering │
└──────────────────────────────────────┘RtdbAdapter Trait
The core interface for database operations. All methods are tenant-aware (tn_id parameter). Write operations go through the separate Transaction trait.
#[async_trait]
pub trait RtdbAdapter: Debug + Send + Sync {
/// Begin a new transaction for write operations
async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
/// Close a database instance, flushing pending changes
async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
/// Query documents with optional filtering, sorting, and pagination
async fn query(&self, tn_id: TnId, db_id: &str, path: &str, opts: QueryOptions)
-> ClResult<Vec<Value>>;
/// Get a single document at a specific path
async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
/// Subscribe to real-time changes (returns a stream of ChangeEvents)
async fn subscribe(&self, tn_id: TnId, db_id: &str, opts: SubscriptionOptions)
-> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
/// Create an index on a field for query performance
async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
-> ClResult<()>;
/// Get database statistics (size, record count, table count)
async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
/// Export all documents from a database
async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
/// Acquire a lock on a document path
async fn acquire_lock(&self, tn_id: TnId, db_id: &str, path: &str,
user_id: &str, mode: LockMode, conn_id: &str) -> ClResult<Option<LockInfo>>;
/// Release a lock on a document path
async fn release_lock(&self, tn_id: TnId, db_id: &str, path: &str,
user_id: &str, conn_id: &str) -> ClResult<()>;
/// Check if a path has an active lock
async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str)
-> ClResult<Option<LockInfo>>;
/// Release all locks held by a specific user (on disconnect)
async fn release_all_locks(&self, tn_id: TnId, db_id: &str,
user_id: &str, conn_id: &str) -> ClResult<()>;
}Transaction Trait
All write operations (create, update, delete) are performed within a transaction:
#[async_trait]
pub trait Transaction: Send + Sync {
/// Create a new document with auto-generated ID
async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
/// Update an existing document (full replacement)
async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
/// Delete a document at a path
async fn delete(&mut self, path: &str) -> ClResult<()>;
/// Read a document (with read-your-own-writes semantics)
async fn get(&self, path: &str) -> ClResult<Option<Value>>;
/// Commit all changes atomically
async fn commit(&mut self) -> ClResult<()>;
/// Rollback all changes
async fn rollback(&mut self) -> ClResult<()>;
}Data Model
Collections and Documents
Data is organized into collections containing JSON documents:
database/
├── users/
│ ├── user_001
│ ├── user_002
│ └── ...
├── posts/
│ ├── post_abc
│ ├── post_def
│ └── ...
└── comments/
└── ...Document Structure
Documents are JSON objects with auto-generated IDs:
{
"_id": "user_001",
"_createdAt": 1738483200,
"_updatedAt": 1738486800,
"name": "Alice",
"email": "alice@example.com",
"age": 30,
"active": true
}System Fields (auto-managed):
_id: Unique document identifier_createdAt: Creation timestamp (Unix)_updatedAt: Last modification timestamp
Path Syntax
Paths use slash-separated segments:
users // Collection
users/user_001 // Specific document
posts/post_abc/comments // Sub-collectionQuery System
QueryOptions
pub struct QueryOptions {
pub filter: Option<QueryFilter>,
pub sort: Option<Vec<SortField>>, // Multiple sort fields supported
pub limit: Option<u32>,
pub offset: Option<u32>,
pub aggregate: Option<AggregateOptions>,
}
pub struct SortField {
pub field: String,
pub ascending: bool, // true for ascending, false for descending
}QueryFilter
QueryFilter is a flat struct (not an enum) where each field is a HashMap<String, Value>. Multiple conditions within the struct are ANDed implicitly — a document must satisfy all specified constraints. All field names use camelCase serialization.
#[serde(rename_all = "camelCase")]
pub struct QueryFilter {
pub equals: HashMap<String, Value>,
pub not_equals: HashMap<String, Value>,
pub greater_than: HashMap<String, Value>,
pub greater_than_or_equal: HashMap<String, Value>,
pub less_than: HashMap<String, Value>,
pub less_than_or_equal: HashMap<String, Value>,
pub in_array: HashMap<String, Vec<Value>>,
pub array_contains: HashMap<String, Value>,
pub not_in_array: HashMap<String, Vec<Value>>,
pub array_contains_any: HashMap<String, Vec<Value>>,
pub array_contains_all: HashMap<String, Vec<Value>>,
}Info
There are no And/Or combinators — multiple conditions are ANDed implicitly. Each HashMap maps field names to expected values.
Query Examples
Simple query:
{
"type": "query",
"id": 1,
"path": "users",
"filter": {
"equals": { "active": true }
},
"sort": [{ "field": "name", "ascending": true }],
"limit": 50
}Complex query (multiple conditions are ANDed):
{
"type": "query",
"id": 2,
"path": "posts",
"filter": {
"equals": { "published": true },
"greaterThan": { "views": 100 }
},
"sort": [{ "field": "createdAt", "ascending": false }],
"limit": 20
}WebSocket Protocol
Message Types
Client → Server
1. Query
{
"type": "query",
"id": 1,
"path": "users",
"filter": { "equals": { "active": true } },
"limit": 50
}2. Subscribe
{
"type": "subscribe",
"id": 2,
"path": "posts",
"filter": { "equals": { "published": true } }
}3. Get (single document)
{
"type": "get",
"id": 3,
"path": "users/user_001"
}4. Transaction (wraps create/update/delete operations)
{
"type": "transaction",
"id": 4,
"operations": [
{
"type": "create",
"path": "posts",
"ref": "$post",
"data": { "title": "Hello", "author": "alice" }
},
{
"type": "update",
"path": "users/alice",
"data": { "postCount": { "$op": "increment", "by": 1 } }
},
{
"type": "replace",
"path": "users/alice",
"data": { "name": "Alice", "role": "admin" }
},
{
"type": "delete",
"path": "posts/post_old"
}
]
}Info
All write operations (create, update, replace, delete) must be wrapped in a transaction message. There are no standalone write message types. The update operation merges fields into the existing document, while replace does a full document replacement.
5. Lock
{
"type": "lock",
"id": 5,
"path": "todos/task_123",
"mode": "hard"
}6. Unlock
{
"type": "unlock",
"id": 6,
"path": "todos/task_123"
}7. Create Index
{
"type": "createIndex",
"id": 7,
"path": "users",
"field": "email"
}8. Ping
{
"type": "ping",
"id": 8
}Server → Client
1. Query Result
{
"type": "queryResult",
"id": 1,
"data": [
{ "_id": "user_001", "name": "Alice", "active": true },
{ "_id": "user_002", "name": "Bob", "active": true }
],
"total": 2
}2. Get Result
{
"type": "getResult",
"id": 3,
"data": { "_id": "user_001", "name": "Alice", "active": true }
}3. Subscribe Result
{
"type": "subscribeResult",
"id": 2,
"subscriptionId": "sub_abc123",
"data": [
{ "_id": "post_001", "title": "Hello", "published": true }
]
}4. Change Event
Change events use a single event object with an action field, not a changes array:
{
"type": "change",
"subscriptionId": "sub_abc123",
"event": {
"action": "create",
"path": "posts",
"data": { "_id": "post_002", "title": "New Post", "published": true }
}
}Possible action values: create, update, delete, lock, unlock, ready
The ready action is sent after the initial subscription data has been delivered.
5. Transaction Result
{
"type": "transactionResult",
"id": 4,
"results": [
{ "ref": "$post", "id": "post_new_001" },
{ "id": "users/alice" },
{ "id": "posts/post_old" }
]
}6. Lock Result
{
"type": "lockResult",
"id": 5,
"locked": true
}If the lock is denied:
{
"type": "lockResult",
"id": 5,
"locked": false,
"holder": "bob@example.com",
"mode": "hard"
}7. Error
{
"type": "error",
"id": 4,
"code": "permission_denied",
"message": "Insufficient permissions to update this document"
}8. Pong
{
"type": "pong",
"id": 8
}Real-Time Subscriptions
Subscription Flow
Client sends subscribe message
↓
Server validates permissions
↓
Server creates broadcast channel
↓
Server executes initial query
↓
Server sends subscribeResult with data
↓
Server watches for changes matching filter
↓
On change: Server sends change event
↓
Client updates local stateImplementation
Subscription Structure:
id: Unique subscription identifierpath: Collection path being subscribed tofilter: Optional query filter to match changessender: Broadcast channel for sending change events
Notification Algorithm:
Algorithm: Notify Subscribers on Change
Input: db_id, path, change_event
Output: None (side effect: sends to all matching subscribers)
1. For each active subscription:
a. Check if subscription.path matches change path
b. If path matches:
- Evaluate if change data matches subscription filter
- If matches: send change_event through subscriber's broadcast channel
- If no match: skip this subscriber
2. Return
This ensures:
- Only subscribed collections receive notifications
- Filter conditions prevent unnecessary updates
- All matching subscribers notified in parallel via broadcast channelsChange Event Types
ChangeEvent is a tagged enum with #[serde(tag = "action")] serialization:
#[serde(tag = "action", rename_all = "camelCase")]
pub enum ChangeEvent {
Create { path: Box<str>, data: Value },
Update { path: Box<str>, data: Value, old_data: Option<Value> },
Delete { path: Box<str>, old_data: Option<Value> },
Lock { path: Box<str>, data: Value },
Unlock { path: Box<str>, data: Value },
Ready { path: Box<str>, data: Option<Value> },
}This serializes as {"action": "create", "path": "...", "data": {...}} — the action field determines the variant.
Transactions
Atomic Operations
Transactions ensure multiple operations execute atomically:
{
"type": "transaction",
"id": 10,
"operations": [
{
"type": "update",
"path": "accounts/alice",
"data": { "balance": { "$op": "increment", "by": -100 } }
},
{
"type": "update",
"path": "accounts/bob",
"data": { "balance": { "$op": "increment", "by": 100 } }
}
]
}Guarantees:
- All operations succeed or all fail
- Intermediate states never visible
- Sequential consistency
Temporary References
Reference documents created within the same transaction:
{
"type": "transaction",
"id": 11,
"operations": [
{
"type": "create",
"path": "posts",
"ref": "$post",
"data": { "title": "My Post", "author": "alice" }
},
{
"type": "create",
"path": "comments",
"data": {
"postId": { "$ref": "$post" },
"text": "First comment!",
"author": "alice"
}
}
]
}How it works:
- First operation creates post, saves ID as
$post - Second operation references
$post, replaced with actual ID - Comment gets correct post ID even though it wasn’t known initially
Document Locking
The RTDB supports document-level locking for exclusive or advisory editing access.
Lock Modes
- Soft lock (advisory): Other clients can still write but receive a notification that the document is locked. Useful for signaling editing intent.
- Hard lock (enforced): The server rejects writes from other clients while the lock is held. Only the lock holder (identified by
conn_id) can modify the document.
Lock/Unlock Messages
Client → Server:
{ "type": "lock", "id": 1, "path": "todos/task_123", "mode": "soft" }
{ "type": "unlock", "id": 2, "path": "todos/task_123" }Server → Client:
{ "type": "lockResult", "id": 1, "locked": true }
{ "type": "lockResult", "id": 1, "locked": false, "holder": "bob@example.com", "mode": "hard" }TTL-Based Expiration
Locks expire automatically after a TTL (time-to-live) period. This prevents permanently locked documents when clients disconnect unexpectedly or crash without releasing their locks. The server cleans up expired locks during its periodic maintenance cycle.
Connection-Based Echo Suppression
The server tracks lock ownership by conn_id. When a lock change event is broadcast to subscribers, the originating connection is excluded from the notification (echo suppression), similar to how write operations suppress echoes. This prevents the client that acquired the lock from receiving its own lock notification.
Lock Status in Change Events
Active subscriptions receive lock/unlock events as part of the change stream:
{
"type": "change",
"subscriptionId": "sub_abc123",
"event": {
"action": "lock",
"path": "todos/task_123",
"data": { "holder": "alice@example.com", "mode": "hard" }
}
}Aggregate Queries
The RTDB supports server-side aggregate computations on query results.
Aggregate Request
Add the aggregate option to a query or subscribe message:
{
"type": "query",
"id": 1,
"path": "tasks",
"filter": { "equals": { "completed": false } },
"aggregate": {
"groupBy": "status",
"ops": [
{ "op": "sum", "field": "hours" },
{ "op": "avg", "field": "hours" }
]
}
}Aggregate Operations
| Operation | Description |
|---|---|
sum |
Sum of a numeric field |
avg |
Average of a numeric field |
min |
Minimum value of a field |
max |
Maximum value of a field |
Each group always includes a count of matching documents.
Aggregate Response
{
"type": "queryResult",
"id": 1,
"aggregate": {
"groups": [
{ "group": "todo", "count": 12, "sum_hours": 36, "avg_hours": 3.0 },
{ "group": "in_progress", "count": 5, "sum_hours": 20, "avg_hours": 4.0 }
]
}
}Incremental Aggregate Subscriptions
When aggregate is used with a subscribe message, the server computes aggregates incrementally. On each change event that affects the subscribed path and filter, the server recalculates the affected groups and sends an updated aggregate snapshot rather than the full document set. This keeps aggregate subscriptions efficient even for large collections.
Computed Values
Field Operations
Modify field values with special operations:
Increment:
{
"views": { "$op": "increment", "by": 1 }
}Append (to array):
{
"tags": { "$op": "append", "value": "javascript" }
}Remove (from array):
{
"tags": { "$op": "remove", "value": "draft" }
}Decrement:
{
"stock": { "$op": "decrement", "by": 1 }
}Multiply:
{
"price": { "$op": "multiply", "by": 1.1 }
}Concat (string concatenation):
{
"fullName": { "$op": "concat", "values": ["firstName", " ", "lastName"] }
}Min (set to minimum of current and given value):
{
"lowestScore": { "$op": "min", "value": 42 }
}Max (set to maximum of current and given value):
{
"highScore": { "$op": "max", "value": 99 }
}Set if not exists:
{
"createdAt": { "$op": "setIfNotExists", "value": 1738483200 }
}Query Operations
Aggregate data within queries:
Count:
{
"type": "query",
"id": 12,
"path": "posts",
"query": { "$query": "count" },
"filter": { "equals": { "published": true } }
}Sum:
{
"type": "query",
"id": 13,
"path": "orders",
"query": { "$query": "sum", "field": "total" }
}Average:
{
"type": "query",
"id": 14,
"path": "reviews",
"query": { "$query": "avg", "field": "rating" }
}Function Operations
Server-side functions for computed values:
Now (current timestamp):
{
"createdAt": { "$fn": "now" }
}Slugify (URL-safe string):
{
"slug": { "$fn": "slugify", "input": "Hello World!" }
// Results in: "hello-world"
}Lowercase (convert string to lowercase):
{
"emailNormalized": { "$fn": "lowercase", "input": "Alice@Example.COM" }
// Results in: "alice@example.com"
}Hash (SHA256):
{
"passwordHash": { "$fn": "hash", "input": "password123" }
}Indexing
Secondary Indexes
Improve query performance:
Index Definition:
name: Unique name for the index (e.g., “idx_email”)fields: Vector of field names to index (single or compound)unique: Boolean flag ensuring no duplicate values (for unique constraints)
Create Index Algorithm:
Algorithm: Create Index
Input: db_id, collection_path, index_definition
Output: Result<()>
1. Validate index definition:
- Check name uniqueness (not duplicate of existing index)
- Verify all fields exist in collection schema
- If unique=true: verify collection has no duplicate values for these fields
2. Build index structure:
- Scan existing documents in collection
- For each document, extract values for indexed fields
- Build index data structure (B-tree for efficient lookups)
3. Store index metadata:
- Save index definition in metadata adapter
- Record index name, fields, and unique flag
4. Return success
This ensures:
- Efficient lookups on indexed fields
- Query optimizer can use index automatically
- Unique constraints enforced at index levelIndex Usage
Queries automatically use indexes when available:
{
"type": "query",
"path": "users",
"filter": { "equals": { "email": "alice@example.com" } }
}
// Uses idx_email if available, otherwise full scanIndex Strategies
Single-field indexes:
fields: vec!["email"] // For email lookups
fields: vec!["createdAt"] // For sorting by date
Compound indexes:
fields: vec!["category", "price"] // For category + price queries
Unique indexes:
unique: true // Ensures no duplicates (e.g., email, username)
Client SDK Example
JavaScript/TypeScript
import { RtdbClient } from '@cloudillo/rtdb'
import { getRtdbUrl } from '@cloudillo/core'
// Create RTDB client
const rtdb = new RtdbClient({
dbId: 'my-database-id',
auth: {
getToken: () => bus.accessToken
},
serverUrl: getRtdbUrl(bus.idTag!, 'my-database-id', bus.accessToken!)
})
// Connect
await rtdb.connect()
// Query data
const users = await rtdb.collection('users')
.where('active', '==', true)
.get()
console.log(users.docs.map(doc => doc.data()))
// Subscribe to changes
const unsubscribe = rtdb.collection('posts')
.where('published', '==', true)
.onSnapshot((snapshot) => {
snapshot.docChanges().forEach((change) => {
if (change.type === 'added') {
console.log('New post:', change.doc.data())
}
if (change.type === 'modified') {
console.log('Modified post:', change.doc.data())
}
if (change.type === 'removed') {
console.log('Removed post:', change.doc.id)
}
})
})
// Create document via batch
const batch = rtdb.batch()
batch.create(rtdb.collection('users'), {
name: 'Charlie',
email: 'charlie@example.com',
age: 28
})
const results = await batch.commit()
// Update document via batch
const batch2 = rtdb.batch()
batch2.update(rtdb.ref('users/' + results[0].id), {
age: 29
})
await batch2.commit()
// Cleanup
unsubscribe()
await rtdb.disconnect()React Example
import { useEffect, useState } from 'react'
import { useAuth } from '@cloudillo/react'
import { RtdbClient } from '@cloudillo/rtdb'
import { getRtdbUrl } from '@cloudillo/core'
interface Task {
title: string
completed: boolean
priority: number
}
function TaskList({ dbId }: { dbId: string }) {
const [auth] = useAuth()
const [tasks, setTasks] = useState<(Task & { id: string })[]>([])
const [rtdb, setRtdb] = useState<RtdbClient | null>(null)
// Initialize RTDB client
useEffect(() => {
if (!auth?.token || !auth?.idTag) return
const client = new RtdbClient({
dbId,
auth: { getToken: () => auth.token },
serverUrl: getRtdbUrl(auth.idTag, dbId, auth.token!)
})
client.connect()
setRtdb(client)
return () => { client.disconnect() }
}, [auth?.token, auth?.idTag, dbId])
// Subscribe to incomplete tasks
useEffect(() => {
if (!rtdb) return
const unsubscribe = rtdb.collection<Task>('tasks')
.where('completed', '==', false)
.onSnapshot((snapshot) => {
setTasks(snapshot.docs.map(doc => ({
id: doc.id,
...doc.data()
})))
})
return () => unsubscribe()
}, [rtdb])
return (
<ul>
{tasks.map(task => (
<li key={task.id}>{task.title} (priority: {task.priority})</li>
))}
</ul>
)
}Performance Optimization
Query Optimization
Use indexes:
// Create index for frequently queried fields
await db.createIndex('users', { fields: ['email'], unique: true });Limit results:
// Always use limit for large collections
const recent = await db.collection('posts')
.orderBy('createdAt', 'desc')
.limit(20)
.get();Use pagination:
const page1 = await db.collection('posts').limit(20).get();
const page2 = await db.collection('posts').limit(20).offset(20).get();Subscription Optimization
Filter subscriptions:
// Only subscribe to relevant data
db.collection('messages')
.where('conversationId', '==', conversationId)
.onSnapshot(handler); // Not all messages
Cleanup subscriptions:
// Always unsubscribe when component unmounts
useEffect(() => {
const unsubscribe = db.collection('...').onSnapshot(handler);
return () => unsubscribe();
}, []);Memory Management
Database instances are managed by the adapter implementation. The RTDB adapter provides a close_db() method to flush pending changes and release resources. Database instances are closed automatically when no more WebSocket connections reference them.
Error Handling
Error Handling
Errors use the common ClResult/ClError system shared across all Cloudillo adapters. The server translates these into WebSocket error messages with appropriate error codes.
Client Handling
try {
await db.collection('users').doc(id).update(data);
} catch (error) {
if (error.code === 'permission_denied') {
console.error('No permission to update');
} else if (error.code === 'document_not_found') {
console.error('Document does not exist');
} else {
console.error('Unknown error:', error);
}
}Security Best Practices
Permission Rules
// Create database with strict permissions
await fetch('/api/db', {
method: 'POST',
body: JSON.stringify({
name: 'Private Data',
type: 'redb',
permissions: {
public_read: false,
public_write: false,
readers: [], // Only owner can read
writers: [] // Only owner can write
}
})
});Input Validation
// Validate on client before sending
function validateUser(data) {
if (!data.email || !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.email)) {
throw new Error('Invalid email');
}
if (!data.name || data.name.length < 2) {
throw new Error('Name too short');
}
return true;
}
if (validateUser(userData)) {
await db.collection('users').add(userData);
}Data Integrity
Write operations enforce several integrity guarantees:
- Hard locks: When a document has a hard lock, only the lock holder (identified by
conn_id) can modify it - ACID transactions: All operations within a transaction succeed or fail atomically
- Read-your-own-writes: Transaction-local reads see uncommitted changes from the same transaction
See Also
- RTDB Overview - Introduction to RTDB system
- CRDT Collaborative Editing - Alternative approach for concurrent editing
- File Storage - Database-as-file implementation
- WebSocket Protocol - WebSocket infrastructure