CRDT Overview
Cloudillo’s CRDT system uses Yrs, a Rust implementation of the Yjs CRDT (Conflict-free Replicated Data Type), to enable true collaborative editing with automatic conflict resolution. This is a separate API from RTDB, optimized specifically for concurrent editing scenarios where multiple users modify the same data simultaneously.
What are CRDTs?
Conflict-free Replicated Data Types (CRDTs) are data structures that can be replicated across multiple nodes and modified independently, then merged automatically without conflicts.
Key Properties
- Eventual Consistency: All replicas converge to the same state
- No Central Authority: No server needed to resolve conflicts
- Deterministic Merging: Same operations always produce same result
- Commutative: Order of operations doesn’t matter
- Idempotent: Applying same operation twice has no extra effect
Example: Concurrent Editing
Alice's editor:
"Hello"
↓ (adds " World")
"Hello World"
Bob's editor (at same time):
"Hello"
↓ (adds "!")
"Hello!"
After sync (both converge to):
"Hello World!"
↑ CRDT algorithm automatically mergesWhy Yrs?
Yrs is the Rust port of Yjs, the most battle-tested CRDT implementation:
Advantages
✅ Production-Ready: Used in Google Docs-like applications ✅ Pure Rust: Memory-safe, high performance ✅ Rich Data Types: Text, Map, Array, XML ✅ Ecosystem Compatible: Works with Yjs clients (JavaScript) ✅ Battle-Tested Protocol: Proven sync algorithm ✅ Awareness API: Presence, cursors, selections ✅ Time-Travel: Built-in versioning support
Performance
- Sync speed: ~50 ms for typical documents (local network)
- Memory efficiency: Compact binary encoding
- Conflict resolution: Deterministic, instantaneous
- Scalability: Tested with 100+ concurrent editors
Architecture
Components
┌──────────────────────────────────────────────┐
│ Client Application │
│ - Yjs Document (Y.Doc) │
│ - Shared types (Y.Text, Y.Map, Y.Array) │
│ - Awareness (presence, cursors) │
└──────────────────────────────────────────────┘
↓ WebSocket (binary protocol)
┌──────────────────────────────────────────────┐
│ Cloudillo Server │
│ ┌────────────────────────────────────────┐ │
│ │ WebSocket Handler │ │
│ │ - Yrs sync protocol │ │
│ │ - Binary message parsing │ │
│ │ - Authentication │ │
│ └────────────────────────────────────────┘ │
│ ↓ │
│ ┌────────────────────────────────────────┐ │
│ │ Database Manager │ │
│ │ - Y.Doc instances (in-memory) │ │
│ │ - Session tracking │ │
│ │ - Snapshot management │ │
│ └────────────────────────────────────────┘ │
│ ↓ │
│ ┌────────────────────────────────────────┐ │
│ │ Yrs Document (yrs::Doc) │ │
│ │ - CRDT state │ │
│ │ - Update log │ │
│ │ - Awareness state │ │
│ └────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ Storage Layer │ │
│ │ - CrdtAdapter (snapshots, updates) │ │
│ │ - Compressed binary format │ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘Database Instance
Each collaborative document is a DatabaseInstance:
pub struct DatabaseInstance {
file_id: Box<str>,
tn_id: TnId,
doc: Arc<RwLock<yrs::Doc>>, // Yrs document
sessions: RwLock<HashMap<SessionId, Session>>, // Connected clients
awareness: Arc<RwLock<yrs::sync::Awareness>>, // Presence info
last_accessed: AtomicTimestamp,
auto_save_handle: Option<TaskHandle>,
}Database Manager
Manages database instances lifecycle:
DatabaseManager Structure:
instances: RwLock-protected HashMap of file_id → DatabaseInstanceblob_adapter: Persistent storage for snapshots and incremental updatesmeta_adapter: Metadata storage for database informationmax_instances: Maximum concurrent documents in memoryeviction_policy: Strategy for removing inactive instancessnapshot_interval: Duration between automatic snapshots
Core Methods:
get_or_load(file_id, tn_id): Retrieve or load a database instance
- Check if instance exists in memory cache
- If cached and recently accessed: return immediately
- If not cached: load latest snapshot from blob storage, apply incremental updates, cache in memory
- Return loaded instance
create_database(tn_id, options): Create new collaborative document
- Generate unique file_id
- Initialize empty Y.Doc with Yrs
- Store metadata in meta_adapter
- Register instance in memory cache
- Return new file_id
snapshot(file_id): Save full document state
- Retrieve document from cache
- Encode full state as compressed binary update
- Store snapshot with versioning
- Prune old snapshots based on retention policy
- Return success
evict(file_id): Remove instance from memory
- Get write lock on instances map
- Remove instance from cache
- Instance data persists in blob storage for later reload
- Release memory
Yrs Data Types
Y.Text
For collaborative text editing:
import * as Y from 'yjs'
const doc = new Y.Doc()
const text = doc.getText('content')
// Insert text
text.insert(0, 'Hello World!')
// Format text
text.format(0, 5, { bold: true }) // Bold "Hello"
// Delete text
text.delete(6, 6) // Remove "World!"
// Observe changes
text.observe(event => {
event.changes.delta.forEach(change => {
if (change.insert) {
console.log('Inserted:', change.insert)
}
if (change.delete) {
console.log('Deleted:', change.delete, 'characters')
}
})
})Y.Map
For key-value data structures:
const map = doc.getMap('config')
// Set values
map.set('theme', 'dark')
map.set('fontSize', 14)
map.set('notifications', { email: true, push: false })
// Get values
const theme = map.get('theme') // 'dark'
// Delete keys
map.delete('fontSize')
// Iterate
map.forEach((value, key) => {
console.log(key, value)
})
// Observe changes
map.observe(event => {
event.changes.keys.forEach((change, key) => {
if (change.action === 'add') {
console.log('Added:', key, map.get(key))
} else if (change.action === 'update') {
console.log('Updated:', key, map.get(key))
} else if (change.action === 'delete') {
console.log('Deleted:', key)
}
})
})Y.Array
For ordered lists:
const array = doc.getArray('tasks')
// Push items
array.push([
{ title: 'Task 1', completed: false },
{ title: 'Task 2', completed: false }
])
// Insert at position
array.insert(0, [{ title: 'Urgent Task', completed: false }])
// Delete
array.delete(1, 1) // Delete 1 item at index 1
// Iterate
array.forEach((item, index) => {
console.log(index, item)
})
// Observe changes
array.observe(event => {
event.changes.delta.forEach(change => {
if (change.insert) {
console.log('Inserted:', change.insert)
}
if (change.delete) {
console.log('Deleted:', change.delete, 'items')
}
})
})Y.XML
For structured document editing:
const xmlFragment = doc.getXmlFragment('document')
// Create element
const paragraph = new Y.XmlElement('p')
paragraph.setAttribute('class', 'intro')
paragraph.insert(0, [new Y.XmlText('Hello World!')])
// Add to document
xmlFragment.push([paragraph])
// Convert to HTML
const html = xmlFragment.toDOM().outerHTMLWebSocket Sync Protocol
Connection Flow
Client Server
| |
|--- GET /ws/db/:fileId ------->|
| (Authorization: Bearer...) |
| |--- Validate token
| |--- Load database instance
| |--- Create session
|<-- 101 Switching Protocols ---|
| |
|<====== WebSocket Open =======>|
| |
|<-- Sync Step 1 (State Vec) ---|
|--- Sync Step 2 (State Vec) -->|
|<-- Update (missing data) -----|
|--- Update (local changes) --->|
| |
|<====== Synchronized =========>|
| |
|--- Update (user edits) ------>|
| |--- Broadcast to sessions
|<-- Update (remote edits) -----|
| |
|--- Awareness Update --------->|
|<-- Awareness Update ----------|Message Types
All messages use binary format (not JSON):
Sync Step 1 (Server → Client)
Server sends its state vector:
[message_type: u8, state_vector: Vec<u8>]State vector represents the version of the document the server has.
Sync Step 2 (Client → Server)
Client sends its state vector:
[message_type: u8, state_vector: Vec<u8>]Update (Bidirectional)
Document updates (changes):
[message_type: u8, update: Vec<u8>]Updates are binary-encoded CRDT operations.
Awareness Update
Presence information (cursors, selections):
[message_type: u8, awareness_update: Vec<u8>]WebSocket Connection Handler
Algorithm: Handle WebSocket Database Connection
Input: WebSocket, DatabaseInstance, Auth context
Output: Result<()>
1. Session Setup:
- Generate unique session_id
- Add session to db_instance.sessions map
- Associate auth context with session
2. Send Sync Step 1:
- Read document from db_instance
- Encode full state as binary (state vector)
- Send binary message to client
3. Message Loop (while socket connected):
- Receive binary message from client
- Extract message type from first byte
a. SYNC_STEP_2 (client state vector):
- Decode state vector from bytes [1..]
- Read document snapshot
- Compute missing updates needed for client
- Send update binary to client
b. UPDATE (document changes):
- Decode update from bytes [1..]
- Acquire write lock on document
- Apply update to Yrs document
- Release lock
- Broadcast update to all other active sessions
c. AWARENESS_UPDATE (presence/cursor):
- Decode awareness update from bytes [1..]
- Acquire write lock on awareness state
- Apply update to awareness
- Release lock
- Broadcast awareness update to all other sessions
d. Other message types:
- Ignore or log unknown types
4. Connection Close:
- Exit message loop on close frame
- Remove session from sessions map
- Return success (cleanup implicit via Arc drop)
This pattern ensures:
- Efficient binary protocol (smaller than JSON)
- State vector sync for minimal data transfer
- Broadcasting to multiple concurrent editors
- Clean session cleanup on disconnectAwareness (Presence)
What is Awareness?
Awareness tracks ephemeral state like:
- User presence (online/offline)
- Cursor positions
- Text selections
- Custom user state (name, color, etc.)
Setting Local State
import { WebsocketProvider } from 'y-websocket'
const provider = new WebsocketProvider(
'wss://cl-o.example.com/ws/db/file_id',
doc
)
// Set local user state
provider.awareness.setLocalStateField('user', {
name: 'Alice',
color: '#ff0000',
avatar: 'https://...'
})
// Update cursor position
provider.awareness.setLocalStateField('cursor', {
anchor: 42,
head: 50
})Observing Awareness
provider.awareness.on('change', ({ added, updated, removed }) => {
// New users joined
added.forEach(clientId => {
const state = provider.awareness.getStates().get(clientId)
console.log('User joined:', state.user.name)
})
// Existing users updated state
updated.forEach(clientId => {
const state = provider.awareness.getStates().get(clientId)
console.log('User updated:', state.user.name)
})
// Users left
removed.forEach(clientId => {
console.log('User left:', clientId)
})
})Rendering Cursors
function renderCursors() {
const states = provider.awareness.getStates()
states.forEach((state, clientId) => {
if (state.cursor && clientId !== provider.awareness.clientID) {
const cursorEl = document.createElement('div')
cursorEl.className = 'remote-cursor'
cursorEl.style.backgroundColor = state.user.color
cursorEl.style.left = `${getCursorPosition(state.cursor.anchor)}px`
cursorEl.textContent = state.user.name
document.body.appendChild(cursorEl)
}
})
}Storage Strategy
Snapshots
Periodic full-state saves reduce sync time for new connections:
Snapshot Strategy Configuration:
update_threshold: Trigger snapshot after N updates (e.g., 1000)time_threshold: Trigger snapshot after duration (e.g., every 5 minutes)compression: zstd compression level 0-9 (default: 3 for speed/ratio balance)retention: Keep N historical snapshots (e.g., 5 recent snapshots)
Algorithm: Create Snapshot
Input: file_id, database_instance
Output: Result<()>
1. Get Write Lock:
- Acquire exclusive lock on document
- (Prevents new updates during snapshot)
2. Encode Full State:
- Create default state vector (represents empty state)
- Encode entire Y.Doc as binary update
- This captures all document content
3. Compress (CPU-intensive work):
- Offload to worker pool with Priority::Low
- Use zstd compression level 3
- Results in binary blob
4. Store in Persistent Storage:
- Generate version string (e.g., "v001", "v002")
- Store compressed snapshot in BlobAdapter:
* key format: "{file_id}~snapshot~{version}"
* value: compressed binary
- This persists document state across restarts
5. Manage Retention:
- List all existing snapshots for this file_id
- If count > retention_limit:
* Delete oldest snapshots
* Keep most recent N snapshots
6. Release Write Lock
7. Return success
Benefits:
- New connections load snapshot instead of replaying all updates
- Significantly faster initial sync for large documents
- Binary format is compact and efficientLoading from Snapshot
Algorithm: Load Database from Storage
Input: file_id, tn_id (tenant)
Output: Result<Arc<DatabaseInstance>>
1. Retrieve Latest Snapshot:
- Query BlobAdapter for "{file_id}~snapshot~latest"
- If found: proceed to step 2
- If not found: create empty document (new database)
2. Decompress Snapshot:
- Use zstd decompression on snapshot bytes
- Produces binary Yrs update data
3. Initialize Yrs Document:
- Create new Y.Doc instance
- Decode compressed binary as Yrs update
- Apply update to empty document
- Document now contains snapshot state
4. Load Incremental Updates:
- Query all updates created since snapshot timestamp
- Sorted in chronological order
- For each update:
* Decode binary update
* Apply to document (applies changes on top of snapshot)
- Document now contains latest state
5. Create DatabaseInstance:
- Wrap Y.Doc in Arc<RwLock> (shared, thread-safe access)
- Initialize empty sessions HashMap
- Create Awareness for presence tracking
- Record current timestamp (for eviction tracking)
- Set auto_save_handle (optional optimization)
6. Start Auto-Save Task:
- Schedule background task for periodic snapshots
- Uses snapshot_interval from DatabaseManager config
- Task runs in background without blocking
7. Register in Memory Cache:
- Add instance to DatabaseManager.instances map
- File_id → Arc<DatabaseInstance>
8. Return Arc-wrapped instance
- Callers can clone Arc for shared access
Optimization:
- Snapshot avoids replaying thousands of individual updates
- Typically reduces load time from seconds → milliseconds for large docsClient Integration
Basic Setup
import * as Y from 'yjs'
import { WebsocketProvider } from 'y-websocket'
// Create document
const doc = new Y.Doc()
// Connect to Cloudillo
const provider = new WebsocketProvider(
`wss://cl-o.example.com/ws/db/${fileId}`,
'document-name', // Room name (arbitrary)
doc,
{
params: {
token: accessToken // Auth token as query param
}
}
)
// Get shared types
const text = doc.getText('content')
const map = doc.getMap('metadata')
// Make changes
text.insert(0, 'Hello World!')
map.set('title', 'My Document')
// Changes automatically sync!
Editor Integration: ProseMirror
import { ySyncPlugin, yCursorPlugin, yUndoPlugin } from 'y-prosemirror'
import { EditorState } from 'prosemirror-state'
import { EditorView } from 'prosemirror-view'
import { schema } from 'prosemirror-schema-basic'
const ytext = doc.getText('prosemirror')
const state = EditorState.create({
schema,
plugins: [
ySyncPlugin(ytext),
yCursorPlugin(provider.awareness),
yUndoPlugin(),
]
})
const view = new EditorView(document.querySelector('#editor'), {
state
})Editor Integration: Monaco (VS Code)
import * as monaco from 'monaco-editor'
import { MonacoBinding } from 'y-monaco'
const ytext = doc.getText('monaco')
const editor = monaco.editor.create(document.getElementById('monaco'), {
value: '',
language: 'javascript'
})
const binding = new MonacoBinding(
ytext,
editor.getModel(),
new Set([editor]),
provider.awareness
)Editor Integration: Quill
import Quill from 'quill'
import { QuillBinding } from 'y-quill'
const ytext = doc.getText('quill')
const quill = new Quill('#editor', {
theme: 'snow'
})
const binding = new QuillBinding(ytext, quill, provider.awareness)Use Cases
Collaborative Text Editor
const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const text = doc.getText('content')
// Bind to editor
const editor = new ProseMirrorEditor(text, provider.awareness)Shared Whiteboard
const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const shapes = doc.getArray('shapes')
// Add shape
shapes.push([{
type: 'rectangle',
x: 100,
y: 100,
width: 200,
height: 150,
color: '#ff0000'
}])
// Observe changes
shapes.observe(event => {
redrawCanvas(shapes.toArray())
})Collaborative Form
const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const form = doc.getMap('form')
// Update fields
form.set('name', 'Alice')
form.set('email', 'alice@example.com')
form.set('preferences', { newsletter: true, notifications: false })
// Observe changes
form.observe(event => {
event.changes.keys.forEach((change, key) => {
updateFormField(key, form.get(key))
})
})Performance Optimization
Connection Pooling
Reuse WebSocket connections:
class CloudilloProvider {
static connections = new Map()
static getProvider(fileId, doc) {
if (!this.connections.has(fileId)) {
const provider = new WebsocketProvider(url, doc)
this.connections.set(fileId, provider)
}
return this.connections.get(fileId)
}
}Batching Updates
Group rapid changes:
let pending = []
let timeout = null
function batchUpdate(update) {
pending.push(update)
clearTimeout(timeout)
timeout = setTimeout(() => {
// Apply all pending updates
doc.transact(() => {
pending.forEach(u => u())
})
pending = []
}, 50) // Batch for 50ms
}Memory Management
Manage in-memory database instances efficiently:
EvictionPolicy Configuration:
max_instances: Maximum number of documents in memory (e.g., 100)idle_timeout: Evict documents unused for duration (e.g., 30 minutes)lru_eviction: Use Least Recently Used strategy when at max capacitypinned: HashSet of file_ids never evicted (for critical documents)
Eviction Algorithm:
- Track last_accessed timestamp for each instance
- On timer (every 1 minute):
- Find all instances exceeding idle_timeout
- If instance count > max_instances:
- Mark least recently accessed for eviction
- Respect pinned set (never evict these)
- For each instance marked for eviction:
- Trigger snapshot (save state to persistent storage)
- Remove from instances map
- Document can be reloaded later from snapshot
Security Considerations
Authentication
Algorithm: Authenticate WebSocket Connection
Input: HTTP headers from WebSocket upgrade request
Output: Result<Auth>
1. Extract Authorization Header:
- Look for "authorization" header
- If missing: Return Unauthorized error
2. Parse Bearer Token:
- Convert header value to string
- Check for "Bearer " prefix
- Extract token (everything after "Bearer ")
- If format invalid: Return InvalidToken error
3. Validate Token:
- Call token validation function
- Verifies signature using profile key
- Checks expiration
- Extracts claims (tn_id, id_tag, scope)
4. Return Auth context
- Contains user identity and permissionsPermission Enforcement
Algorithm: Check Database Permission
Input: Auth context, DatabaseMetadata, Action (read/write)
Output: Result<()>
1. Check Ownership:
- If auth.id_tag == db_meta.owner: Return Ok
- Owner has all permissions
2. Check Action-Specific Permissions:
- For Read action:
* If auth.id_tag in db_meta.permissions.readers: Return Ok
- For Write action:
* If auth.id_tag in db_meta.permissions.writers: Return Ok
3. Default Deny:
- If no permission found: Return PermissionDeniedRate Limiting
Server-Side Rate Limits:
-
MAX_UPDATES_PER_SECOND: 100 updates per user per second- Prevents overwhelming with document changes
- Typical usage: 5-20 updates/second for collaborative editing
-
MAX_AWARENESS_UPDATES_PER_SECOND: 10 awareness updates per user per second- Controls cursor/presence update frequency
- Prevents spam of position updates
Implementation:
- Per-user sliding window counter
- Track requests in last 1 second
- When exceeded: Return HTTP 429 with retry-after header
See Also
- Yjs Documentation - Official Yjs CRDT documentation
- Yrs on GitHub - Rust implementation of Yjs
- CRDT redb Implementation - How CRDT updates are persisted
- RTDB Overview - Introduction to RTDB system
- System Architecture - Overall architecture
- WebSocket Protocol - WebSocket infrastructure