redb Implementation
The query-based RTDB uses redb, a lightweight embedded database, to provide Firebase-like functionality with minimal overhead. This approach is ideal for structured data, complex queries, and traditional database operations.
Why redb?
redb is chosen for its exceptional characteristics:
- Tiny footprint: 171 KiB package size (vs. 1+ MB for most databases)
- Pure Rust: Memory-safe, no unsafe code
- ACID transactions: Full transactional guarantees
- Zero-copy reads: Excellent performance
- Embedded: No separate database server
- LMDB-inspired: Proven B-tree architecture
Architecture
Layered Design
┌──────────────────────────────────────┐
│ Client Application (JavaScript) │
│ - Query API │
│ - Subscriptions │
│ - Transactions │
└──────────────────────────────────────┘
↓ WebSocket
┌──────────────────────────────────────┐
│ WebSocket Handler │
│ - Message parsing │
│ - Authentication │
│ - Subscription tracking │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ RtdbAdapter Trait │
│ - query() │
│ - create() │
│ - update() │
│ - delete() │
│ - transaction() │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ redb Implementation │
│ - Key-value storage │
│ - Secondary indexes │
│ - Query execution │
└──────────────────────────────────────┘
↓
┌──────────────────────────────────────┐
│ Real-Time Layer │
│ - tokio::broadcast channels │
│ - Change event propagation │
│ - Subscription filtering │
└──────────────────────────────────────┘RtdbAdapter Trait
The core interface for database operations:
#[async_trait]
pub trait RtdbAdapter: Send + Sync {
/// Query documents
async fn query(
&self,
db_id: &str,
path: &str,
options: QueryOptions,
) -> Result<Vec<Document>>;
/// Create document
async fn create(
&self,
db_id: &str,
path: &str,
data: Value,
) -> Result<String>; // Returns document ID
/// Update document
async fn update(
&self,
db_id: &str,
path: &str,
doc_id: &str,
data: Value,
) -> Result<()>;
/// Delete document
async fn delete(
&self,
db_id: &str,
path: &str,
doc_id: &str,
) -> Result<()>;
/// Execute transaction
async fn transaction(
&self,
db_id: &str,
operations: Vec<Operation>,
) -> Result<TransactionResult>;
/// Subscribe to changes
fn subscribe(
&self,
db_id: &str,
path: &str,
filter: Option<QueryFilter>,
) -> broadcast::Receiver<ChangeEvent>;
}Data Model
Collections and Documents
Data is organized into collections containing JSON documents:
database/
├── users/
│ ├── user_001
│ ├── user_002
│ └── ...
├── posts/
│ ├── post_abc
│ ├── post_def
│ └── ...
└── comments/
└── ...Document Structure
Documents are JSON objects with auto-generated IDs:
{
"_id": "user_001",
"_createdAt": 1738483200,
"_updatedAt": 1738486800,
"name": "Alice",
"email": "alice@example.com",
"age": 30,
"active": true
}System Fields (auto-managed):
_id: Unique document identifier_createdAt: Creation timestamp (Unix)_updatedAt: Last modification timestamp
Path Syntax
Paths use slash-separated segments:
users // Collection
users/user_001 // Specific document
posts/post_abc/comments // Sub-collectionQuery System
QueryOptions
pub struct QueryOptions {
pub filter: Option<QueryFilter>,
pub sort: Option<Sort>,
pub limit: Option<u32>,
pub offset: Option<u32>,
}
pub struct Sort {
pub field: String,
pub direction: SortDirection, // Asc | Desc
}QueryFilter
Filters support various comparison operators:
pub enum QueryFilter {
Equals { field: String, value: Value },
NotEquals { field: String, value: Value },
GreaterThan { field: String, value: Value },
GreaterThanOrEqual { field: String, value: Value },
LessThan { field: String, value: Value },
LessThanOrEqual { field: String, value: Value },
Contains { field: String, value: String },
In { field: String, values: Vec<Value> },
And(Vec<QueryFilter>),
Or(Vec<QueryFilter>),
}Query Examples
Simple query:
{
"type": "query",
"id": 1,
"path": "users",
"filter": {
"equals": { "field": "active", "value": true }
},
"sort": { "field": "name", "direction": "asc" },
"limit": 50
}Complex query:
{
"type": "query",
"id": 2,
"path": "posts",
"filter": {
"and": [
{ "equals": { "field": "published", "value": true } },
{ "greaterThan": { "field": "views", "value": 100 } }
]
},
"sort": { "field": "createdAt", "direction": "desc" },
"limit": 20
}WebSocket Protocol
Message Types
Client → Server
1. Query
{
"type": "query",
"id": 1,
"path": "users",
"filter": { "equals": { "field": "active", "value": true } },
"limit": 50
}2. Subscribe
{
"type": "subscribe",
"id": 2,
"path": "posts",
"filter": { "equals": { "field": "published", "value": true } }
}3. Create
{
"type": "create",
"id": 3,
"path": "users",
"data": {
"name": "Bob",
"email": "bob@example.com",
"age": 25
}
}4. Update
{
"type": "update",
"id": 4,
"path": "users",
"docId": "user_001",
"data": {
"age": 31,
"lastLogin": 1738486800
}
}5. Delete
{
"type": "delete",
"id": 5,
"path": "users",
"docId": "user_001"
}6. Transaction
{
"type": "transaction",
"id": 6,
"operations": [
{
"type": "create",
"path": "posts",
"ref": "$post",
"data": { "title": "Hello", "author": "alice" }
},
{
"type": "update",
"path": "users/alice",
"data": { "postCount": { "$op": "increment", "by": 1 } }
}
]
}Server → Client
1. Query Result
{
"type": "queryResult",
"id": 1,
"data": [
{ "_id": "user_001", "name": "Alice", "active": true },
{ "_id": "user_002", "name": "Bob", "active": true }
],
"total": 2
}2. Subscribe Result
{
"type": "subscribeResult",
"id": 2,
"subscriptionId": "sub_abc123",
"data": [
{ "_id": "post_001", "title": "Hello", "published": true }
]
}3. Change Event
{
"type": "change",
"subscriptionId": "sub_abc123",
"changes": [
{
"type": "added",
"path": "posts",
"docId": "post_002",
"data": { "title": "New Post", "published": true }
}
]
}4. Create Result
{
"type": "createResult",
"id": 3,
"docId": "user_003"
}5. Error
{
"type": "error",
"id": 4,
"code": "permission_denied",
"message": "Insufficient permissions to update this document"
}Real-Time Subscriptions
Subscription Flow
Client sends subscribe message
↓
Server validates permissions
↓
Server creates broadcast channel
↓
Server executes initial query
↓
Server sends subscribeResult with data
↓
Server watches for changes matching filter
↓
On change: Server sends change event
↓
Client updates local stateImplementation
Subscription Structure:
id: Unique subscription identifierpath: Collection path being subscribed tofilter: Optional query filter to match changessender: Broadcast channel for sending change events
Notification Algorithm:
Algorithm: Notify Subscribers on Change
Input: db_id, path, change_event
Output: None (side effect: sends to all matching subscribers)
1. For each active subscription:
a. Check if subscription.path matches change path
b. If path matches:
- Evaluate if change data matches subscription filter
- If matches: send change_event through subscriber's broadcast channel
- If no match: skip this subscriber
2. Return
This ensures:
- Only subscribed collections receive notifications
- Filter conditions prevent unnecessary updates
- All matching subscribers notified in parallel via broadcast channelsChange Event Types
pub enum ChangeType {
Added, // New document created
Modified, // Existing document updated
Removed, // Document deleted
}
pub struct ChangeEvent {
pub change_type: ChangeType,
pub path: String,
pub doc_id: String,
pub data: Option<Value>, // None for Removed
}Transactions
Atomic Operations
Transactions ensure multiple operations execute atomically:
{
"type": "transaction",
"id": 10,
"operations": [
{
"type": "update",
"path": "accounts/alice",
"data": { "balance": { "$op": "increment", "by": -100 } }
},
{
"type": "update",
"path": "accounts/bob",
"data": { "balance": { "$op": "increment", "by": 100 } }
}
]
}Guarantees:
- All operations succeed or all fail
- Intermediate states never visible
- Sequential consistency
Temporary References
Reference documents created within the same transaction:
{
"type": "transaction",
"id": 11,
"operations": [
{
"type": "create",
"path": "posts",
"ref": "$post",
"data": { "title": "My Post", "author": "alice" }
},
{
"type": "create",
"path": "comments",
"data": {
"postId": { "$ref": "$post" },
"text": "First comment!",
"author": "alice"
}
}
]
}How it works:
- First operation creates post, saves ID as
$post - Second operation references
$post, replaced with actual ID - Comment gets correct post ID even though it wasn’t known initially
Computed Values
Field Operations
Modify field values with special operations:
Increment:
{
"views": { "$op": "increment", "by": 1 }
}Append (to array):
{
"tags": { "$op": "append", "value": "javascript" }
}Remove (from array):
{
"tags": { "$op": "remove", "value": "draft" }
}Set if not exists:
{
"createdAt": { "$op": "setIfNotExists", "value": 1738483200 }
}Query Operations
Aggregate data within queries:
Count:
{
"type": "query",
"id": 12,
"path": "posts",
"query": { "$query": "count" },
"filter": { "equals": { "field": "published", "value": true } }
}Sum:
{
"type": "query",
"id": 13,
"path": "orders",
"query": { "$query": "sum", "field": "total" }
}Average:
{
"type": "query",
"id": 14,
"path": "reviews",
"query": { "$query": "avg", "field": "rating" }
}Function Operations
Server-side functions for computed values:
Now (current timestamp):
{
"createdAt": { "$fn": "now" }
}UUID (generate unique ID):
{
"trackingId": { "$fn": "uuid" }
}Slugify (URL-safe string):
{
"slug": { "$fn": "slugify", "input": "Hello World!" }
// Results in: "hello-world"
}Hash (SHA256):
{
"passwordHash": { "$fn": "hash", "input": "password123" }
}Indexing
Secondary Indexes
Improve query performance:
Index Definition:
name: Unique name for the index (e.g., “idx_email”)fields: Vector of field names to index (single or compound)unique: Boolean flag ensuring no duplicate values (for unique constraints)
Create Index Algorithm:
Algorithm: Create Index
Input: db_id, collection_path, index_definition
Output: Result<()>
1. Validate index definition:
- Check name uniqueness (not duplicate of existing index)
- Verify all fields exist in collection schema
- If unique=true: verify collection has no duplicate values for these fields
2. Build index structure:
- Scan existing documents in collection
- For each document, extract values for indexed fields
- Build index data structure (B-tree for efficient lookups)
3. Store index metadata:
- Save index definition in metadata adapter
- Record index name, fields, and unique flag
4. Return success
This ensures:
- Efficient lookups on indexed fields
- Query optimizer can use index automatically
- Unique constraints enforced at index levelIndex Usage
Queries automatically use indexes when available:
{
"type": "query",
"path": "users",
"filter": { "equals": { "field": "email", "value": "alice@example.com" } }
}
// Uses idx_email if available, otherwise full scanIndex Strategies
Single-field indexes:
fields: vec!["email"] // For email lookups
fields: vec!["createdAt"] // For sorting by date
Compound indexes:
fields: vec!["category", "price"] // For category + price queries
Unique indexes:
unique: true // Ensures no duplicates (e.g., email, username)
Client SDK Example
JavaScript/TypeScript
import { CloudilloRtdb } from 'cloudillo-rtdb-client';
// Connect to database
const db = await CloudilloRtdb.connect(fileId, {
authToken: accessToken,
serverUrl: 'wss://cl-o.example.com'
});
// Query data
const users = await db.collection('users')
.where('active', '==', true)
.orderBy('name', 'asc')
.limit(50)
.get();
console.log(users.docs);
// Subscribe to changes
const unsubscribe = db.collection('posts')
.where('published', '==', true)
.onSnapshot((snapshot) => {
snapshot.docChanges().forEach((change) => {
if (change.type === 'added') {
console.log('New post:', change.doc.data());
}
if (change.type === 'modified') {
console.log('Modified post:', change.doc.data());
}
if (change.type === 'removed') {
console.log('Removed post:', change.doc.id);
}
});
});
// Create document
const docId = await db.collection('users').add({
name: 'Charlie',
email: 'charlie@example.com',
age: 28
});
// Update document
await db.collection('users').doc(docId).update({
age: 29,
lastLogin: new Date()
});
// Transaction
await db.runTransaction(async (transaction) => {
const postRef = db.collection('posts').doc();
transaction.create(postRef, {
title: 'My Post',
author: 'alice',
createdAt: Date.now()
});
transaction.update(db.collection('users').doc('alice'), {
postCount: { $increment: 1 }
});
});
// Cleanup
unsubscribe();
await db.close();React Hook Example
import { useRtdbQuery, useRtdbSubscription } from 'cloudillo-react';
function TaskList() {
// Query tasks
const { data: tasks, loading, error } = useRtdbQuery(
db.collection('tasks')
.where('completed', '==', false)
.orderBy('priority', 'desc')
);
// Subscribe to changes
const { data: liveTasks } = useRtdbSubscription(
db.collection('tasks').where('assignee', '==', userId)
);
if (loading) return <div>Loading...</div>;
if (error) return <div>Error: {error.message}</div>;
return (
<ul>
{tasks.map(task => (
<li key={task._id}>{task.title}</li>
))}
</ul>
);
}Performance Optimization
Query Optimization
Use indexes:
// Create index for frequently queried fields
await db.createIndex('users', { fields: ['email'], unique: true });Limit results:
// Always use limit for large collections
const recent = await db.collection('posts')
.orderBy('createdAt', 'desc')
.limit(20)
.get();Use pagination:
const page1 = await db.collection('posts').limit(20).get();
const page2 = await db.collection('posts').limit(20).offset(20).get();Subscription Optimization
Filter subscriptions:
// Only subscribe to relevant data
db.collection('messages')
.where('conversationId', '==', conversationId)
.onSnapshot(handler); // Not all messages
Cleanup subscriptions:
// Always unsubscribe when component unmounts
useEffect(() => {
const unsubscribe = db.collection('...').onSnapshot(handler);
return () => unsubscribe();
}, []);Memory Management
Database eviction:
pub struct EvictionPolicy {
max_instances: usize, // Max databases in memory
idle_timeout: Duration, // Evict after N minutes idle
lru_eviction: bool, // Use LRU when at max
}Connection limits:
pub struct ConnectionLimits {
max_sessions_per_db: usize, // Per database
max_total_sessions: usize, // Server-wide
max_sessions_per_user: usize,
}Error Handling
Error Types
pub enum RtdbError {
PermissionDenied,
DocumentNotFound,
InvalidQuery,
TransactionFailed,
DatabaseNotFound,
ConnectionClosed,
RateLimitExceeded,
}Client Handling
try {
await db.collection('users').doc(id).update(data);
} catch (error) {
if (error.code === 'permission_denied') {
console.error('No permission to update');
} else if (error.code === 'document_not_found') {
console.error('Document does not exist');
} else {
console.error('Unknown error:', error);
}
}Security Best Practices
Permission Rules
// Create database with strict permissions
await fetch('/api/db', {
method: 'POST',
body: JSON.stringify({
name: 'Private Data',
type: 'redb',
permissions: {
public_read: false,
public_write: false,
readers: [], // Only owner can read
writers: [] // Only owner can write
}
})
});Input Validation
// Validate on client before sending
function validateUser(data) {
if (!data.email || !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(data.email)) {
throw new Error('Invalid email');
}
if (!data.name || data.name.length < 2) {
throw new Error('Name too short');
}
return true;
}
if (validateUser(userData)) {
await db.collection('users').add(userData);
}Rate Limiting
Server-Side Rate Limits:
To prevent abuse and ensure fair resource allocation, the server enforces per-user/per-IP rate limits:
Query Limits:
MAX_QUERIES_PER_SECOND: 100 queries per user per second- Prevents overwhelming the database with excessive read queries
- Typical usage: <10 queries/second for normal applications
Write Limits:
MAX_WRITES_PER_SECOND: 10 writes per user per second- More restrictive than reads to protect data consistency
- Typical usage: <1 write/second for normal applications
Implementation Pattern:
- Sliding window counter: Track requests in last 1 second
- Per-user tracking: Rate limits apply to authenticated users
- Per-IP fallback: Anonymous requests rate-limited by IP address
- When exceeded: Return HTTP 429 (Too Many Requests) with retry-after header
See Also
- RTDB Overview - Introduction to RTDB system
- CRDT Collaborative Editing - Alternative approach for concurrent editing
- File Storage - Database-as-file implementation
- WebSocket Protocol - WebSocket infrastructure