CRDT Overview
Cloudillo’s CRDT system uses Yrs, a Rust implementation of the Yjs CRDT (Conflict-free Replicated Data Type), to enable true collaborative editing with automatic conflict resolution. This is a separate API from RTDB, optimized specifically for concurrent editing scenarios where multiple users modify the same data simultaneously.
What are CRDTs?
Conflict-free Replicated Data Types (CRDTs) are data structures that can be replicated across multiple nodes and modified independently, then merged automatically without conflicts.
Key Properties
- Eventual Consistency: All replicas converge to the same state
- No Central Authority: No server needed to resolve conflicts
- Deterministic Merging: Same operations always produce same result
- Commutative: Order of operations doesn’t matter
- Idempotent: Applying same operation twice has no extra effect
Example: Concurrent Editing
Alice's editor:
"Hello"
↓ (adds " World")
"Hello World"
Bob's editor (at same time):
"Hello"
↓ (adds "!")
"Hello!"
After sync (both converge to):
"Hello World!"
↑ CRDT algorithm automatically mergesWhy Yrs?
Yrs is the Rust port of Yjs, the most battle-tested CRDT implementation:
Advantages
✅ Production-Ready: Used in Google Docs-like applications ✅ Pure Rust: Memory-safe, high performance ✅ Rich Data Types: Text, Map, Array, XML ✅ Ecosystem Compatible: Works with Yjs clients (JavaScript) ✅ Battle-Tested Protocol: Proven sync algorithm ✅ Awareness API: Presence, cursors, selections ✅ Time-Travel: Built-in versioning support
Performance
- Sync speed: ~50 ms for typical documents (local network)
- Memory efficiency: Compact binary encoding
- Conflict resolution: Deterministic, instantaneous
- Scalability: Tested with 100+ concurrent editors
Architecture
Components
┌──────────────────────────────────────────────┐
│ Client Application │
│ - Yjs Document (Y.Doc) │
│ - Shared types (Y.Text, Y.Map, Y.Array) │
│ - Awareness (presence, cursors) │
└──────────────────────────────────────────────┘
↓ WebSocket (binary protocol)
┌──────────────────────────────────────────────┐
│ Cloudillo Server (stateless — no Y.Doc) │
│ ┌────────────────────────────────────────┐ │
│ │ WebSocket Handler │ │
│ │ - Yrs message decoding │ │
│ │ - Binary message parsing │ │
│ │ - Authentication (at connection) │ │
│ └────────────────────────────────────────┘ │
│ ↓ │
│ ┌────────────────────────────────────────┐ │
│ │ Document Registry (CRDT_DOCS) │ │
│ │ - Broadcast channels per document │ │
│ │ - Connection tracking │ │
│ │ - Cleanup on last disconnect │ │
│ └────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────┐ │
│ │ Storage Layer │ │
│ │ - CrdtAdapter (binary updates) │ │
│ │ - Optimization on last disconnect │ │
│ └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘Connection Tracking
Each connected client is tracked with a CrdtConnection:
struct CrdtConnection {
conn_id: String, // Unique connection ID (distinguishes multiple tabs)
user_id: String,
doc_id: String,
tn_id: TnId,
awareness_tx: Arc<broadcast::Sender<(String, Vec<u8>)>>, // Awareness channel
sync_tx: Arc<broadcast::Sender<(String, Vec<u8>)>>, // Sync channel
last_access_update: Mutex<Option<Instant>>, // Throttled access tracking
last_modify_update: Mutex<Option<Instant>>, // Throttled modification tracking
has_modified: AtomicBool, // Whether session made changes
}Info
The server is stateless — it does not hold a Y.Doc in memory. Instead, it stores raw binary updates via the CrdtAdapter and broadcasts them to other clients via channels. Document state only exists in the connected clients.
Document Registry
Active documents are tracked in a global registry:
type DocChannels = (
Arc<broadcast::Sender<(String, Vec<u8>)>>, // Awareness: (conn_id, data)
Arc<broadcast::Sender<(String, Vec<u8>)>>, // Sync: (conn_id, data)
);
static CRDT_DOCS: LazyLock<RwLock<HashMap<String, DocChannels>>> =
LazyLock::new(|| RwLock::new(HashMap::new()));When a client connects, the server gets or creates broadcast channels for the document. When a client disconnects, if both channels have zero receivers, the entry is removed from the registry.
Yrs Data Types
Y.Text
For collaborative text editing:
import * as Y from 'yjs'
const doc = new Y.Doc()
const text = doc.getText('content')
// Insert text
text.insert(0, 'Hello World!')
// Format text
text.format(0, 5, { bold: true }) // Bold "Hello"
// Delete text
text.delete(6, 6) // Remove "World!"
// Observe changes
text.observe(event => {
event.changes.delta.forEach(change => {
if (change.insert) {
console.log('Inserted:', change.insert)
}
if (change.delete) {
console.log('Deleted:', change.delete, 'characters')
}
})
})Y.Map
For key-value data structures:
const map = doc.getMap('config')
// Set values
map.set('theme', 'dark')
map.set('fontSize', 14)
map.set('notifications', { email: true, push: false })
// Get values
const theme = map.get('theme') // 'dark'
// Delete keys
map.delete('fontSize')
// Iterate
map.forEach((value, key) => {
console.log(key, value)
})
// Observe changes
map.observe(event => {
event.changes.keys.forEach((change, key) => {
if (change.action === 'add') {
console.log('Added:', key, map.get(key))
} else if (change.action === 'update') {
console.log('Updated:', key, map.get(key))
} else if (change.action === 'delete') {
console.log('Deleted:', key)
}
})
})Y.Array
For ordered lists:
const array = doc.getArray('tasks')
// Push items
array.push([
{ title: 'Task 1', completed: false },
{ title: 'Task 2', completed: false }
])
// Insert at position
array.insert(0, [{ title: 'Urgent Task', completed: false }])
// Delete
array.delete(1, 1) // Delete 1 item at index 1
// Iterate
array.forEach((item, index) => {
console.log(index, item)
})
// Observe changes
array.observe(event => {
event.changes.delta.forEach(change => {
if (change.insert) {
console.log('Inserted:', change.insert)
}
if (change.delete) {
console.log('Deleted:', change.delete, 'items')
}
})
})Y.XML
For structured document editing:
const xmlFragment = doc.getXmlFragment('document')
// Create element
const paragraph = new Y.XmlElement('p')
paragraph.setAttribute('class', 'intro')
paragraph.insert(0, [new Y.XmlText('Hello World!')])
// Add to document
xmlFragment.push([paragraph])
// Convert to HTML
const html = xmlFragment.toDOM().outerHTMLWebSocket Sync Protocol
Connection Flow
Client Server
| |
|--- GET /ws/crdt/:docId ------>|
| (Authorization: Bearer...) |
| |--- Validate token
| |--- Load database instance
| |--- Create session
|<-- 101 Switching Protocols ---|
| |
|<====== WebSocket Open =======>|
| |
|<-- Update (stored update 1) --| Server sends all stored
|<-- Update (stored update 2) --| updates from CrdtAdapter
|<-- Update (stored update N) --| as SyncMessage::Update
| |
|<====== Synchronized =========>|
| |
|--- Update (user edits) ------>|--- Store via CrdtAdapter
|<-- Update (echo back) --------|--- Echo to sender
| |--- Broadcast to other clients
|<-- Update (remote edits) -----|
| |
|--- Awareness Update --------->|--- Echo to sender
|<-- Awareness Update ----------|--- Broadcast to othersMessage Types
All messages use the Yjs sync protocol binary format (lib0 encoding, not JSON):
- MSG_SYNC (0): Sync protocol messages (SyncStep1, SyncStep2, Update)
- MSG_AWARENESS (1): User presence/cursor updates
Messages are encoded/decoded using yrs::sync::Message.
Update (Primary message type)
Document updates (changes), used for both initial sync and live editing:
YMessage::Sync(SyncMessage::Update(update_data))The server sends stored updates as SyncMessage::Update messages during initial sync. It does not use state vector exchange — since the server has no Y.Doc in memory, it cannot compute a state vector.
Awareness Update
Presence information (cursors, selections):
YMessage::Awareness(awareness_update)WebSocket Connection Handler
Algorithm: Handle CRDT WebSocket Connection
Input: WebSocket, user_id, doc_id, app, tn_id, read_only
Output: ()
1. Connection Setup:
- Generate unique conn_id
- Get or create broadcast channels for this document (CRDT_DOCS registry)
- Create CrdtConnection struct
- Record initial file access (throttled)
2. Send Initial Sync:
- Load all stored updates via CrdtAdapter.get_updates()
- If no updates exist: create initial Y.Doc with meta map, store it
- Send each stored update as SyncMessage::Update to client
3. Spawn Concurrent Tasks:
- Heartbeat task: sends ping frames every 15 seconds
- Receive task: processes incoming WebSocket messages
- Sync broadcast task: forwards CRDT updates from other clients
- Awareness broadcast task: forwards awareness updates from other clients
4. Message Loop (receive task):
For each binary WebSocket message:
a. SYNC Update:
- If read_only: silently reject, return
- Validate update with Update::decode_v1() (catches corruption)
- Store via CrdtAdapter.store_update()
- If store fails: skip broadcasting (prevents data loss)
- Broadcast to other clients via sync_tx channel
- Echo back to sender
b. AWARENESS Update:
- Broadcast to other clients via awareness_tx channel
- Echo back to sender
c. Broadcast tasks (per client):
- Receive from channel, skip messages from own conn_id
- Forward to client via WebSocket
5. Connection Close:
- Record final file access/modification
- Abort heartbeat, sync, and awareness tasks
- Check if last connection: if so, wait 2s grace period
- If still no connections after grace: optimize document
This pattern ensures:
- Stateless server (no Y.Doc in memory)
- Echo + broadcast (sender gets echo, others get broadcast)
- Persistence before broadcasting (no data loss on crash)
- Automatic optimization when all clients disconnectAwareness (Presence)
What is Awareness?
Awareness tracks ephemeral state like:
- User presence (online/offline)
- Cursor positions
- Text selections
- Custom user state (name, color, etc.)
Setting Local State
import { WebsocketProvider } from 'y-websocket'
const provider = new WebsocketProvider(
'wss://cl-o.example.com/ws/crdt/file_id',
doc
)
// Set local user state
provider.awareness.setLocalStateField('user', {
name: 'Alice',
color: '#ff0000',
avatar: 'https://...'
})
// Update cursor position
provider.awareness.setLocalStateField('cursor', {
anchor: 42,
head: 50
})Observing Awareness
provider.awareness.on('change', ({ added, updated, removed }) => {
// New users joined
added.forEach(clientId => {
const state = provider.awareness.getStates().get(clientId)
console.log('User joined:', state.user.name)
})
// Existing users updated state
updated.forEach(clientId => {
const state = provider.awareness.getStates().get(clientId)
console.log('User updated:', state.user.name)
})
// Users left
removed.forEach(clientId => {
console.log('User left:', clientId)
})
})Rendering Cursors
function renderCursors() {
const states = provider.awareness.getStates()
states.forEach((state, clientId) => {
if (state.cursor && clientId !== provider.awareness.clientID) {
const cursorEl = document.createElement('div')
cursorEl.className = 'remote-cursor'
cursorEl.style.backgroundColor = state.user.color
cursorEl.style.left = `${getCursorPosition(state.cursor.anchor)}px`
cursorEl.textContent = state.user.name
document.body.appendChild(cursorEl)
}
})
}Storage Strategy
Update Storage
All CRDT changes are stored as individual binary updates via the CrdtAdapter. When a new client connects, all stored updates are sent one by one. There is no snapshot system — updates accumulate and are merged during optimization.
Optimization (Update Merging)
When the last client disconnects from a document, the server merges all stored updates into a single compacted update:
Algorithm: Optimize Document
Input: app, tn_id, doc_id
Output: ()
1. Wait Grace Period:
- After last disconnect, wait 2 seconds
- Re-check that no new connections were established
- If new client connected: skip optimization
2. Load All Updates:
- Get all stored updates via CrdtAdapter.get_updates()
- If 0 or 1 updates: skip (nothing to optimize)
3. Merge Updates (CPU-bound, runs in spawn_blocking):
- Decode all updates with Update::decode_v1()
- Skip corrupted updates (log warnings)
- Create a temporary Y.Doc
- Apply all decoded updates to the Doc
- Encode full state as a single update via encode_state_as_update_v1()
4. Size Check:
- Compare merged size vs total original size
- If no size reduction: skip (optimization would not help)
5. Replace Stored Updates:
- Delete all old updates via CrdtAdapter.delete_doc()
- Store the single merged update via CrdtAdapter.store_update()
- Mark as system-generated (client_id = "system")
Benefits:
- Reduces initial sync time for subsequent connections
- Reduces storage usage
- Happens automatically when document is idleClient Integration
Basic Setup
import * as Y from 'yjs'
import { WebsocketProvider } from 'y-websocket'
// Create document
const doc = new Y.Doc()
// Connect to Cloudillo
const provider = new WebsocketProvider(
`wss://cl-o.example.com/ws/crdt/${fileId}`,
'document-name', // Room name (arbitrary)
doc,
{
params: {
token: accessToken // Auth token as query param
}
}
)
// Get shared types
const text = doc.getText('content')
const map = doc.getMap('metadata')
// Make changes
text.insert(0, 'Hello World!')
map.set('title', 'My Document')
// Changes automatically sync!
Editor Integration: ProseMirror
import { ySyncPlugin, yCursorPlugin, yUndoPlugin } from 'y-prosemirror'
import { EditorState } from 'prosemirror-state'
import { EditorView } from 'prosemirror-view'
import { schema } from 'prosemirror-schema-basic'
const ytext = doc.getText('prosemirror')
const state = EditorState.create({
schema,
plugins: [
ySyncPlugin(ytext),
yCursorPlugin(provider.awareness),
yUndoPlugin(),
]
})
const view = new EditorView(document.querySelector('#editor'), {
state
})Editor Integration: Monaco (VS Code)
import * as monaco from 'monaco-editor'
import { MonacoBinding } from 'y-monaco'
const ytext = doc.getText('monaco')
const editor = monaco.editor.create(document.getElementById('monaco'), {
value: '',
language: 'javascript'
})
const binding = new MonacoBinding(
ytext,
editor.getModel(),
new Set([editor]),
provider.awareness
)Editor Integration: Quill
import Quill from 'quill'
import { QuillBinding } from 'y-quill'
const ytext = doc.getText('quill')
const quill = new Quill('#editor', {
theme: 'snow'
})
const binding = new QuillBinding(ytext, quill, provider.awareness)Use Cases
Collaborative Text Editor
const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const text = doc.getText('content')
// Bind to editor
const editor = new ProseMirrorEditor(text, provider.awareness)Shared Whiteboard
const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const shapes = doc.getArray('shapes')
// Add shape
shapes.push([{
type: 'rectangle',
x: 100,
y: 100,
width: 200,
height: 150,
color: '#ff0000'
}])
// Observe changes
shapes.observe(event => {
redrawCanvas(shapes.toArray())
})Collaborative Form
const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const form = doc.getMap('form')
// Update fields
form.set('name', 'Alice')
form.set('email', 'alice@example.com')
form.set('preferences', { newsletter: true, notifications: false })
// Observe changes
form.observe(event => {
event.changes.keys.forEach((change, key) => {
updateFormField(key, form.get(key))
})
})Performance Optimization
Connection Pooling
Reuse WebSocket connections:
class CloudilloProvider {
static connections = new Map()
static getProvider(fileId, doc) {
if (!this.connections.has(fileId)) {
const provider = new WebsocketProvider(url, doc)
this.connections.set(fileId, provider)
}
return this.connections.get(fileId)
}
}Batching Updates
Group rapid changes:
let pending = []
let timeout = null
function batchUpdate(update) {
pending.push(update)
clearTimeout(timeout)
timeout = setTimeout(() => {
// Apply all pending updates
doc.transact(() => {
pending.forEach(u => u())
})
pending = []
}, 50) // Batch for 50ms
}Memory Management
Document channel cleanup is based on receiver count. When a client disconnects, the server checks if both broadcast channels (awareness and sync) have zero receivers. If so, the document entry is removed from the CRDT_DOCS registry.
Cleanup Algorithm:
1. Client disconnects
2. Check awareness_tx.receiver_count() and sync_tx.receiver_count()
3. If both are 0:
- Remove entry from CRDT_DOCS registry
- Wait 2s grace period
- Re-check for new connections
- If still no connections: run optimization
4. If receivers remain: no cleanup neededSince the server holds no Y.Doc in memory, there is no document eviction needed — only the lightweight broadcast channels are held per active document.
Security Considerations
Authentication
WebSocket connections use axum’s OptionalAuth extractor for authentication:
- Extract auth context from the WebSocket upgrade request
- If no auth context: reject with close code 4401 (“Unauthorized”)
- Auth context provides:
id_tag,tn_id,roles, and optionalscope
Permission Enforcement
Permissions are checked once at connection time using file_access::check_file_access_with_scope(). This function evaluates:
- Scoped tokens: Share links with restricted access (read-only or read-write)
- Ownership: File owner has full access
- Tenant roles: Role-based access within the tenant
- FSHR action tokens: Federation-based file sharing permissions
The result determines whether the connection is read_only or read_write. Clients can also request a specific access level via the ?access=read or ?access=write query parameter.
Warning
Access level is checked once at connection time but not re-validated during the session. If a user’s access is revoked (e.g., an FSHR action is deleted), they retain their original access level until they reconnect.
Read-Only Enforcement
Read-only connections (determined at connection time) are enforced at the message handler level. When a read-only client sends an Update message, the server silently rejects it — the update is not stored and not broadcast. The client will see its changes rejected on the next sync cycle.
See Also
- Yjs Documentation - Official Yjs CRDT documentation
- Yrs on GitHub - Rust implementation of Yjs
- CRDT redb Implementation - How CRDT updates are persisted
- RTDB Overview - Introduction to RTDB system
- System Architecture - Overall architecture
- WebSocket Protocol - WebSocket infrastructure