CRDT Overview

Cloudillo’s CRDT system uses Yrs, a Rust implementation of the Yjs CRDT (Conflict-free Replicated Data Type), to enable true collaborative editing with automatic conflict resolution. This is a separate API from RTDB, optimized specifically for concurrent editing scenarios where multiple users modify the same data simultaneously.

What are CRDTs?

Conflict-free Replicated Data Types (CRDTs) are data structures that can be replicated across multiple nodes and modified independently, then merged automatically without conflicts.

Key Properties

  1. Eventual Consistency: All replicas converge to the same state
  2. No Central Authority: No server needed to resolve conflicts
  3. Deterministic Merging: Same operations always produce same result
  4. Commutative: Order of operations doesn’t matter
  5. Idempotent: Applying same operation twice has no extra effect

Example: Concurrent Editing

Alice's editor:
  "Hello"
  ↓ (adds " World")
  "Hello World"

Bob's editor (at same time):
  "Hello"
  ↓ (adds "!")
  "Hello!"

After sync (both converge to):
  "Hello World!"
  ↑ CRDT algorithm automatically merges

Why Yrs?

Yrs is the Rust port of Yjs, the most battle-tested CRDT implementation:

Advantages

Production-Ready: Used in Google Docs-like applications ✅ Pure Rust: Memory-safe, high performance ✅ Rich Data Types: Text, Map, Array, XML ✅ Ecosystem Compatible: Works with Yjs clients (JavaScript) ✅ Battle-Tested Protocol: Proven sync algorithm ✅ Awareness API: Presence, cursors, selections ✅ Time-Travel: Built-in versioning support

Performance

  • Sync speed: ~50 ms for typical documents (local network)
  • Memory efficiency: Compact binary encoding
  • Conflict resolution: Deterministic, instantaneous
  • Scalability: Tested with 100+ concurrent editors

Architecture

Components

┌──────────────────────────────────────────────┐
│ Client Application                           │
│  - Yjs Document (Y.Doc)                      │
│  - Shared types (Y.Text, Y.Map, Y.Array)     │
│  - Awareness (presence, cursors)             │
└──────────────────────────────────────────────┘
               ↓ WebSocket (binary protocol)
┌──────────────────────────────────────────────┐
│ Cloudillo Server (stateless — no Y.Doc)      │
│  ┌────────────────────────────────────────┐  │
│  │ WebSocket Handler                      │  │
│  │  - Yrs message decoding                │  │
│  │  - Binary message parsing              │  │
│  │  - Authentication (at connection)      │  │
│  └────────────────────────────────────────┘  │
│                    ↓                         │
│  ┌────────────────────────────────────────┐  │
│  │ Document Registry (CRDT_DOCS)          │  │
│  │  - Broadcast channels per document     │  │
│  │  - Connection tracking                 │  │
│  │  - Cleanup on last disconnect          │  │
│  └────────────────────────────────────────┘  │
│                    ↓                         │
│  ┌─────────────────────────────────────────┐ │
│  │ Storage Layer                           │ │
│  │  - CrdtAdapter (binary updates)         │ │
│  │  - Optimization on last disconnect      │ │
│  └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘

Connection Tracking

Each connected client is tracked with a CrdtConnection:

struct CrdtConnection {
    conn_id: String,       // Unique connection ID (distinguishes multiple tabs)
    user_id: String,
    doc_id: String,
    tn_id: TnId,
    awareness_tx: Arc<broadcast::Sender<(String, Vec<u8>)>>,  // Awareness channel
    sync_tx: Arc<broadcast::Sender<(String, Vec<u8>)>>,       // Sync channel
    last_access_update: Mutex<Option<Instant>>,   // Throttled access tracking
    last_modify_update: Mutex<Option<Instant>>,   // Throttled modification tracking
    has_modified: AtomicBool,                      // Whether session made changes
}
Info

The server is stateless — it does not hold a Y.Doc in memory. Instead, it stores raw binary updates via the CrdtAdapter and broadcasts them to other clients via channels. Document state only exists in the connected clients.

Document Registry

Active documents are tracked in a global registry:

type DocChannels = (
    Arc<broadcast::Sender<(String, Vec<u8>)>>,  // Awareness: (conn_id, data)
    Arc<broadcast::Sender<(String, Vec<u8>)>>,  // Sync: (conn_id, data)
);

static CRDT_DOCS: LazyLock<RwLock<HashMap<String, DocChannels>>> =
    LazyLock::new(|| RwLock::new(HashMap::new()));

When a client connects, the server gets or creates broadcast channels for the document. When a client disconnects, if both channels have zero receivers, the entry is removed from the registry.

Yrs Data Types

Y.Text

For collaborative text editing:

import * as Y from 'yjs'

const doc = new Y.Doc()
const text = doc.getText('content')

// Insert text
text.insert(0, 'Hello World!')

// Format text
text.format(0, 5, { bold: true })  // Bold "Hello"

// Delete text
text.delete(6, 6)  // Remove "World!"

// Observe changes
text.observe(event => {
  event.changes.delta.forEach(change => {
    if (change.insert) {
      console.log('Inserted:', change.insert)
    }
    if (change.delete) {
      console.log('Deleted:', change.delete, 'characters')
    }
  })
})

Y.Map

For key-value data structures:

const map = doc.getMap('config')

// Set values
map.set('theme', 'dark')
map.set('fontSize', 14)
map.set('notifications', { email: true, push: false })

// Get values
const theme = map.get('theme')  // 'dark'

// Delete keys
map.delete('fontSize')

// Iterate
map.forEach((value, key) => {
  console.log(key, value)
})

// Observe changes
map.observe(event => {
  event.changes.keys.forEach((change, key) => {
    if (change.action === 'add') {
      console.log('Added:', key, map.get(key))
    } else if (change.action === 'update') {
      console.log('Updated:', key, map.get(key))
    } else if (change.action === 'delete') {
      console.log('Deleted:', key)
    }
  })
})

Y.Array

For ordered lists:

const array = doc.getArray('tasks')

// Push items
array.push([
  { title: 'Task 1', completed: false },
  { title: 'Task 2', completed: false }
])

// Insert at position
array.insert(0, [{ title: 'Urgent Task', completed: false }])

// Delete
array.delete(1, 1)  // Delete 1 item at index 1

// Iterate
array.forEach((item, index) => {
  console.log(index, item)
})

// Observe changes
array.observe(event => {
  event.changes.delta.forEach(change => {
    if (change.insert) {
      console.log('Inserted:', change.insert)
    }
    if (change.delete) {
      console.log('Deleted:', change.delete, 'items')
    }
  })
})

Y.XML

For structured document editing:

const xmlFragment = doc.getXmlFragment('document')

// Create element
const paragraph = new Y.XmlElement('p')
paragraph.setAttribute('class', 'intro')
paragraph.insert(0, [new Y.XmlText('Hello World!')])

// Add to document
xmlFragment.push([paragraph])

// Convert to HTML
const html = xmlFragment.toDOM().outerHTML

WebSocket Sync Protocol

Connection Flow

Client                          Server
  |                               |
  |--- GET /ws/crdt/:docId ------>|
  |    (Authorization: Bearer...) |
  |                               |--- Validate token
  |                               |--- Load database instance
  |                               |--- Create session
  |<-- 101 Switching Protocols ---|
  |                               |
  |<====== WebSocket Open =======>|
  |                               |
  |<-- Update (stored update 1) --|  Server sends all stored
  |<-- Update (stored update 2) --|  updates from CrdtAdapter
  |<-- Update (stored update N) --|  as SyncMessage::Update
  |                               |
  |<====== Synchronized =========>|
  |                               |
  |--- Update (user edits) ------>|--- Store via CrdtAdapter
  |<-- Update (echo back) --------|--- Echo to sender
  |                               |--- Broadcast to other clients
  |<-- Update (remote edits) -----|
  |                               |
  |--- Awareness Update --------->|--- Echo to sender
  |<-- Awareness Update ----------|--- Broadcast to others

Message Types

All messages use the Yjs sync protocol binary format (lib0 encoding, not JSON):

  • MSG_SYNC (0): Sync protocol messages (SyncStep1, SyncStep2, Update)
  • MSG_AWARENESS (1): User presence/cursor updates

Messages are encoded/decoded using yrs::sync::Message.

Update (Primary message type)

Document updates (changes), used for both initial sync and live editing:

YMessage::Sync(SyncMessage::Update(update_data))

The server sends stored updates as SyncMessage::Update messages during initial sync. It does not use state vector exchange — since the server has no Y.Doc in memory, it cannot compute a state vector.

Awareness Update

Presence information (cursors, selections):

YMessage::Awareness(awareness_update)

WebSocket Connection Handler

Algorithm: Handle CRDT WebSocket Connection

Input: WebSocket, user_id, doc_id, app, tn_id, read_only
Output: ()

1. Connection Setup:
   - Generate unique conn_id
   - Get or create broadcast channels for this document (CRDT_DOCS registry)
   - Create CrdtConnection struct
   - Record initial file access (throttled)

2. Send Initial Sync:
   - Load all stored updates via CrdtAdapter.get_updates()
   - If no updates exist: create initial Y.Doc with meta map, store it
   - Send each stored update as SyncMessage::Update to client

3. Spawn Concurrent Tasks:
   - Heartbeat task: sends ping frames every 15 seconds
   - Receive task: processes incoming WebSocket messages
   - Sync broadcast task: forwards CRDT updates from other clients
   - Awareness broadcast task: forwards awareness updates from other clients

4. Message Loop (receive task):
   For each binary WebSocket message:

   a. SYNC Update:
      - If read_only: silently reject, return
      - Validate update with Update::decode_v1() (catches corruption)
      - Store via CrdtAdapter.store_update()
      - If store fails: skip broadcasting (prevents data loss)
      - Broadcast to other clients via sync_tx channel
      - Echo back to sender

   b. AWARENESS Update:
      - Broadcast to other clients via awareness_tx channel
      - Echo back to sender

   c. Broadcast tasks (per client):
      - Receive from channel, skip messages from own conn_id
      - Forward to client via WebSocket

5. Connection Close:
   - Record final file access/modification
   - Abort heartbeat, sync, and awareness tasks
   - Check if last connection: if so, wait 2s grace period
   - If still no connections after grace: optimize document

This pattern ensures:
- Stateless server (no Y.Doc in memory)
- Echo + broadcast (sender gets echo, others get broadcast)
- Persistence before broadcasting (no data loss on crash)
- Automatic optimization when all clients disconnect

Awareness (Presence)

What is Awareness?

Awareness tracks ephemeral state like:

  • User presence (online/offline)
  • Cursor positions
  • Text selections
  • Custom user state (name, color, etc.)

Setting Local State

import { WebsocketProvider } from 'y-websocket'

const provider = new WebsocketProvider(
  'wss://cl-o.example.com/ws/crdt/file_id',
  doc
)

// Set local user state
provider.awareness.setLocalStateField('user', {
  name: 'Alice',
  color: '#ff0000',
  avatar: 'https://...'
})

// Update cursor position
provider.awareness.setLocalStateField('cursor', {
  anchor: 42,
  head: 50
})

Observing Awareness

provider.awareness.on('change', ({ added, updated, removed }) => {
  // New users joined
  added.forEach(clientId => {
    const state = provider.awareness.getStates().get(clientId)
    console.log('User joined:', state.user.name)
  })

  // Existing users updated state
  updated.forEach(clientId => {
    const state = provider.awareness.getStates().get(clientId)
    console.log('User updated:', state.user.name)
  })

  // Users left
  removed.forEach(clientId => {
    console.log('User left:', clientId)
  })
})

Rendering Cursors

function renderCursors() {
  const states = provider.awareness.getStates()

  states.forEach((state, clientId) => {
    if (state.cursor && clientId !== provider.awareness.clientID) {
      const cursorEl = document.createElement('div')
      cursorEl.className = 'remote-cursor'
      cursorEl.style.backgroundColor = state.user.color
      cursorEl.style.left = `${getCursorPosition(state.cursor.anchor)}px`
      cursorEl.textContent = state.user.name

      document.body.appendChild(cursorEl)
    }
  })
}

Storage Strategy

Update Storage

All CRDT changes are stored as individual binary updates via the CrdtAdapter. When a new client connects, all stored updates are sent one by one. There is no snapshot system — updates accumulate and are merged during optimization.

Optimization (Update Merging)

When the last client disconnects from a document, the server merges all stored updates into a single compacted update:

Algorithm: Optimize Document

Input: app, tn_id, doc_id
Output: ()

1. Wait Grace Period:
   - After last disconnect, wait 2 seconds
   - Re-check that no new connections were established
   - If new client connected: skip optimization

2. Load All Updates:
   - Get all stored updates via CrdtAdapter.get_updates()
   - If 0 or 1 updates: skip (nothing to optimize)

3. Merge Updates (CPU-bound, runs in spawn_blocking):
   - Decode all updates with Update::decode_v1()
   - Skip corrupted updates (log warnings)
   - Create a temporary Y.Doc
   - Apply all decoded updates to the Doc
   - Encode full state as a single update via encode_state_as_update_v1()

4. Size Check:
   - Compare merged size vs total original size
   - If no size reduction: skip (optimization would not help)

5. Replace Stored Updates:
   - Delete all old updates via CrdtAdapter.delete_doc()
   - Store the single merged update via CrdtAdapter.store_update()
   - Mark as system-generated (client_id = "system")

Benefits:
- Reduces initial sync time for subsequent connections
- Reduces storage usage
- Happens automatically when document is idle

Client Integration

Basic Setup

import * as Y from 'yjs'
import { WebsocketProvider } from 'y-websocket'

// Create document
const doc = new Y.Doc()

// Connect to Cloudillo
const provider = new WebsocketProvider(
  `wss://cl-o.example.com/ws/crdt/${fileId}`,
  'document-name',  // Room name (arbitrary)
  doc,
  {
    params: {
      token: accessToken  // Auth token as query param
    }
  }
)

// Get shared types
const text = doc.getText('content')
const map = doc.getMap('metadata')

// Make changes
text.insert(0, 'Hello World!')
map.set('title', 'My Document')

// Changes automatically sync!

Editor Integration: ProseMirror

import { ySyncPlugin, yCursorPlugin, yUndoPlugin } from 'y-prosemirror'
import { EditorState } from 'prosemirror-state'
import { EditorView } from 'prosemirror-view'
import { schema } from 'prosemirror-schema-basic'

const ytext = doc.getText('prosemirror')

const state = EditorState.create({
  schema,
  plugins: [
    ySyncPlugin(ytext),
    yCursorPlugin(provider.awareness),
    yUndoPlugin(),
  ]
})

const view = new EditorView(document.querySelector('#editor'), {
  state
})

Editor Integration: Monaco (VS Code)

import * as monaco from 'monaco-editor'
import { MonacoBinding } from 'y-monaco'

const ytext = doc.getText('monaco')

const editor = monaco.editor.create(document.getElementById('monaco'), {
  value: '',
  language: 'javascript'
})

const binding = new MonacoBinding(
  ytext,
  editor.getModel(),
  new Set([editor]),
  provider.awareness
)

Editor Integration: Quill

import Quill from 'quill'
import { QuillBinding } from 'y-quill'

const ytext = doc.getText('quill')

const quill = new Quill('#editor', {
  theme: 'snow'
})

const binding = new QuillBinding(ytext, quill, provider.awareness)

Use Cases

Collaborative Text Editor

const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const text = doc.getText('content')

// Bind to editor
const editor = new ProseMirrorEditor(text, provider.awareness)

Shared Whiteboard

const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const shapes = doc.getArray('shapes')

// Add shape
shapes.push([{
  type: 'rectangle',
  x: 100,
  y: 100,
  width: 200,
  height: 150,
  color: '#ff0000'
}])

// Observe changes
shapes.observe(event => {
  redrawCanvas(shapes.toArray())
})

Collaborative Form

const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const form = doc.getMap('form')

// Update fields
form.set('name', 'Alice')
form.set('email', 'alice@example.com')
form.set('preferences', { newsletter: true, notifications: false })

// Observe changes
form.observe(event => {
  event.changes.keys.forEach((change, key) => {
    updateFormField(key, form.get(key))
  })
})

Performance Optimization

Connection Pooling

Reuse WebSocket connections:

class CloudilloProvider {
  static connections = new Map()

  static getProvider(fileId, doc) {
    if (!this.connections.has(fileId)) {
      const provider = new WebsocketProvider(url, doc)
      this.connections.set(fileId, provider)
    }
    return this.connections.get(fileId)
  }
}

Batching Updates

Group rapid changes:

let pending = []
let timeout = null

function batchUpdate(update) {
  pending.push(update)

  clearTimeout(timeout)
  timeout = setTimeout(() => {
    // Apply all pending updates
    doc.transact(() => {
      pending.forEach(u => u())
    })
    pending = []
  }, 50)  // Batch for 50ms
}

Memory Management

Document channel cleanup is based on receiver count. When a client disconnects, the server checks if both broadcast channels (awareness and sync) have zero receivers. If so, the document entry is removed from the CRDT_DOCS registry.

Cleanup Algorithm:
1. Client disconnects
2. Check awareness_tx.receiver_count() and sync_tx.receiver_count()
3. If both are 0:
   - Remove entry from CRDT_DOCS registry
   - Wait 2s grace period
   - Re-check for new connections
   - If still no connections: run optimization
4. If receivers remain: no cleanup needed

Since the server holds no Y.Doc in memory, there is no document eviction needed — only the lightweight broadcast channels are held per active document.

Security Considerations

Authentication

WebSocket connections use axum’s OptionalAuth extractor for authentication:

  1. Extract auth context from the WebSocket upgrade request
  2. If no auth context: reject with close code 4401 (“Unauthorized”)
  3. Auth context provides: id_tag, tn_id, roles, and optional scope

Permission Enforcement

Permissions are checked once at connection time using file_access::check_file_access_with_scope(). This function evaluates:

  1. Scoped tokens: Share links with restricted access (read-only or read-write)
  2. Ownership: File owner has full access
  3. Tenant roles: Role-based access within the tenant
  4. FSHR action tokens: Federation-based file sharing permissions

The result determines whether the connection is read_only or read_write. Clients can also request a specific access level via the ?access=read or ?access=write query parameter.

Warning

Access level is checked once at connection time but not re-validated during the session. If a user’s access is revoked (e.g., an FSHR action is deleted), they retain their original access level until they reconnect.

Read-Only Enforcement

Read-only connections (determined at connection time) are enforced at the message handler level. When a read-only client sends an Update message, the server silently rejects it — the update is not stored and not broadcast. The client will see its changes rejected on the next sync cycle.

See Also