CRDT Overview

Cloudillo’s CRDT system uses Yrs, a Rust implementation of the Yjs CRDT (Conflict-free Replicated Data Type), to enable true collaborative editing with automatic conflict resolution. This is a separate API from RTDB, optimized specifically for concurrent editing scenarios where multiple users modify the same data simultaneously.

What are CRDTs?

Conflict-free Replicated Data Types (CRDTs) are data structures that can be replicated across multiple nodes and modified independently, then merged automatically without conflicts.

Key Properties

  1. Eventual Consistency: All replicas converge to the same state
  2. No Central Authority: No server needed to resolve conflicts
  3. Deterministic Merging: Same operations always produce same result
  4. Commutative: Order of operations doesn’t matter
  5. Idempotent: Applying same operation twice has no extra effect

Example: Concurrent Editing

Alice's editor:
  "Hello"
  ↓ (adds " World")
  "Hello World"

Bob's editor (at same time):
  "Hello"
  ↓ (adds "!")
  "Hello!"

After sync (both converge to):
  "Hello World!"
  ↑ CRDT algorithm automatically merges

Why Yrs?

Yrs is the Rust port of Yjs, the most battle-tested CRDT implementation:

Advantages

Production-Ready: Used in Google Docs-like applications ✅ Pure Rust: Memory-safe, high performance ✅ Rich Data Types: Text, Map, Array, XML ✅ Ecosystem Compatible: Works with Yjs clients (JavaScript) ✅ Battle-Tested Protocol: Proven sync algorithm ✅ Awareness API: Presence, cursors, selections ✅ Time-Travel: Built-in versioning support

Performance

  • Sync speed: ~50 ms for typical documents (local network)
  • Memory efficiency: Compact binary encoding
  • Conflict resolution: Deterministic, instantaneous
  • Scalability: Tested with 100+ concurrent editors

Architecture

Components

┌──────────────────────────────────────────────┐
│ Client Application                           │
│  - Yjs Document (Y.Doc)                      │
│  - Shared types (Y.Text, Y.Map, Y.Array)     │
│  - Awareness (presence, cursors)             │
└──────────────────────────────────────────────┘
               ↓ WebSocket (binary protocol)
┌──────────────────────────────────────────────┐
│ Cloudillo Server                             │
│  ┌────────────────────────────────────────┐  │
│  │ WebSocket Handler                      │  │
│  │  - Yrs sync protocol                   │  │
│  │  - Binary message parsing              │  │
│  │  - Authentication                      │  │
│  └────────────────────────────────────────┘  │
│                    ↓                         │
│  ┌────────────────────────────────────────┐  │
│  │ Database Manager                       │  │
│  │  - Y.Doc instances (in-memory)         │  │
│  │  - Session tracking                    │  │
│  │  - Snapshot management                 │  │
│  └────────────────────────────────────────┘  │
│                    ↓                         │
│  ┌────────────────────────────────────────┐  │
│  │ Yrs Document (yrs::Doc)                │  │
│  │  - CRDT state                          │  │
│  │  - Update log                          │  │
│  │  - Awareness state                     │  │
│  └────────────────────────────────────────┘  │
│                    ↓                         │
│  ┌─────────────────────────────────────────┐ │
│  │ Storage Layer                           │ │
│  │  - CrdtAdapter (snapshots, updates)     │ │
│  │  - Compressed binary format             │ │
│  └─────────────────────────────────────────┘ │
└──────────────────────────────────────────────┘

Database Instance

Each collaborative document is a DatabaseInstance:

pub struct DatabaseInstance {
    file_id: Box<str>,
    tn_id: TnId,
    doc: Arc<RwLock<yrs::Doc>>,                    // Yrs document
    sessions: RwLock<HashMap<SessionId, Session>>,  // Connected clients
    awareness: Arc<RwLock<yrs::sync::Awareness>>,  // Presence info
    last_accessed: AtomicTimestamp,
    auto_save_handle: Option<TaskHandle>,
}

Database Manager

Manages database instances lifecycle:

DatabaseManager Structure:

  • instances: RwLock-protected HashMap of file_id → DatabaseInstance
  • blob_adapter: Persistent storage for snapshots and incremental updates
  • meta_adapter: Metadata storage for database information
  • max_instances: Maximum concurrent documents in memory
  • eviction_policy: Strategy for removing inactive instances
  • snapshot_interval: Duration between automatic snapshots

Core Methods:

get_or_load(file_id, tn_id): Retrieve or load a database instance

  • Check if instance exists in memory cache
  • If cached and recently accessed: return immediately
  • If not cached: load latest snapshot from blob storage, apply incremental updates, cache in memory
  • Return loaded instance

create_database(tn_id, options): Create new collaborative document

  • Generate unique file_id
  • Initialize empty Y.Doc with Yrs
  • Store metadata in meta_adapter
  • Register instance in memory cache
  • Return new file_id

snapshot(file_id): Save full document state

  • Retrieve document from cache
  • Encode full state as compressed binary update
  • Store snapshot with versioning
  • Prune old snapshots based on retention policy
  • Return success

evict(file_id): Remove instance from memory

  • Get write lock on instances map
  • Remove instance from cache
  • Instance data persists in blob storage for later reload
  • Release memory

Yrs Data Types

Y.Text

For collaborative text editing:

import * as Y from 'yjs'

const doc = new Y.Doc()
const text = doc.getText('content')

// Insert text
text.insert(0, 'Hello World!')

// Format text
text.format(0, 5, { bold: true })  // Bold "Hello"

// Delete text
text.delete(6, 6)  // Remove "World!"

// Observe changes
text.observe(event => {
  event.changes.delta.forEach(change => {
    if (change.insert) {
      console.log('Inserted:', change.insert)
    }
    if (change.delete) {
      console.log('Deleted:', change.delete, 'characters')
    }
  })
})

Y.Map

For key-value data structures:

const map = doc.getMap('config')

// Set values
map.set('theme', 'dark')
map.set('fontSize', 14)
map.set('notifications', { email: true, push: false })

// Get values
const theme = map.get('theme')  // 'dark'

// Delete keys
map.delete('fontSize')

// Iterate
map.forEach((value, key) => {
  console.log(key, value)
})

// Observe changes
map.observe(event => {
  event.changes.keys.forEach((change, key) => {
    if (change.action === 'add') {
      console.log('Added:', key, map.get(key))
    } else if (change.action === 'update') {
      console.log('Updated:', key, map.get(key))
    } else if (change.action === 'delete') {
      console.log('Deleted:', key)
    }
  })
})

Y.Array

For ordered lists:

const array = doc.getArray('tasks')

// Push items
array.push([
  { title: 'Task 1', completed: false },
  { title: 'Task 2', completed: false }
])

// Insert at position
array.insert(0, [{ title: 'Urgent Task', completed: false }])

// Delete
array.delete(1, 1)  // Delete 1 item at index 1

// Iterate
array.forEach((item, index) => {
  console.log(index, item)
})

// Observe changes
array.observe(event => {
  event.changes.delta.forEach(change => {
    if (change.insert) {
      console.log('Inserted:', change.insert)
    }
    if (change.delete) {
      console.log('Deleted:', change.delete, 'items')
    }
  })
})

Y.XML

For structured document editing:

const xmlFragment = doc.getXmlFragment('document')

// Create element
const paragraph = new Y.XmlElement('p')
paragraph.setAttribute('class', 'intro')
paragraph.insert(0, [new Y.XmlText('Hello World!')])

// Add to document
xmlFragment.push([paragraph])

// Convert to HTML
const html = xmlFragment.toDOM().outerHTML

WebSocket Sync Protocol

Connection Flow

Client                          Server
  |                               |
  |--- GET /ws/db/:fileId ------->|
  |    (Authorization: Bearer...) |
  |                               |--- Validate token
  |                               |--- Load database instance
  |                               |--- Create session
  |<-- 101 Switching Protocols ---|
  |                               |
  |<====== WebSocket Open =======>|
  |                               |
  |<-- Sync Step 1 (State Vec) ---|
  |--- Sync Step 2 (State Vec) -->|
  |<-- Update (missing data) -----|
  |--- Update (local changes) --->|
  |                               |
  |<====== Synchronized =========>|
  |                               |
  |--- Update (user edits) ------>|
  |                               |--- Broadcast to sessions
  |<-- Update (remote edits) -----|
  |                               |
  |--- Awareness Update --------->|
  |<-- Awareness Update ----------|

Message Types

All messages use binary format (not JSON):

Sync Step 1 (Server → Client)

Server sends its state vector:

[message_type: u8, state_vector: Vec<u8>]

State vector represents the version of the document the server has.

Sync Step 2 (Client → Server)

Client sends its state vector:

[message_type: u8, state_vector: Vec<u8>]

Update (Bidirectional)

Document updates (changes):

[message_type: u8, update: Vec<u8>]

Updates are binary-encoded CRDT operations.

Awareness Update

Presence information (cursors, selections):

[message_type: u8, awareness_update: Vec<u8>]

WebSocket Connection Handler

Algorithm: Handle WebSocket Database Connection

Input: WebSocket, DatabaseInstance, Auth context
Output: Result<()>

1. Session Setup:
   - Generate unique session_id
   - Add session to db_instance.sessions map
   - Associate auth context with session

2. Send Sync Step 1:
   - Read document from db_instance
   - Encode full state as binary (state vector)
   - Send binary message to client

3. Message Loop (while socket connected):
   - Receive binary message from client
   - Extract message type from first byte

   a. SYNC_STEP_2 (client state vector):
      - Decode state vector from bytes [1..]
      - Read document snapshot
      - Compute missing updates needed for client
      - Send update binary to client

   b. UPDATE (document changes):
      - Decode update from bytes [1..]
      - Acquire write lock on document
      - Apply update to Yrs document
      - Release lock
      - Broadcast update to all other active sessions

   c. AWARENESS_UPDATE (presence/cursor):
      - Decode awareness update from bytes [1..]
      - Acquire write lock on awareness state
      - Apply update to awareness
      - Release lock
      - Broadcast awareness update to all other sessions

   d. Other message types:
      - Ignore or log unknown types

4. Connection Close:
   - Exit message loop on close frame
   - Remove session from sessions map
   - Return success (cleanup implicit via Arc drop)

This pattern ensures:
- Efficient binary protocol (smaller than JSON)
- State vector sync for minimal data transfer
- Broadcasting to multiple concurrent editors
- Clean session cleanup on disconnect

Awareness (Presence)

What is Awareness?

Awareness tracks ephemeral state like:

  • User presence (online/offline)
  • Cursor positions
  • Text selections
  • Custom user state (name, color, etc.)

Setting Local State

import { WebsocketProvider } from 'y-websocket'

const provider = new WebsocketProvider(
  'wss://cl-o.example.com/ws/db/file_id',
  doc
)

// Set local user state
provider.awareness.setLocalStateField('user', {
  name: 'Alice',
  color: '#ff0000',
  avatar: 'https://...'
})

// Update cursor position
provider.awareness.setLocalStateField('cursor', {
  anchor: 42,
  head: 50
})

Observing Awareness

provider.awareness.on('change', ({ added, updated, removed }) => {
  // New users joined
  added.forEach(clientId => {
    const state = provider.awareness.getStates().get(clientId)
    console.log('User joined:', state.user.name)
  })

  // Existing users updated state
  updated.forEach(clientId => {
    const state = provider.awareness.getStates().get(clientId)
    console.log('User updated:', state.user.name)
  })

  // Users left
  removed.forEach(clientId => {
    console.log('User left:', clientId)
  })
})

Rendering Cursors

function renderCursors() {
  const states = provider.awareness.getStates()

  states.forEach((state, clientId) => {
    if (state.cursor && clientId !== provider.awareness.clientID) {
      const cursorEl = document.createElement('div')
      cursorEl.className = 'remote-cursor'
      cursorEl.style.backgroundColor = state.user.color
      cursorEl.style.left = `${getCursorPosition(state.cursor.anchor)}px`
      cursorEl.textContent = state.user.name

      document.body.appendChild(cursorEl)
    }
  })
}

Storage Strategy

Snapshots

Periodic full-state saves reduce sync time for new connections:

Snapshot Strategy Configuration:

  • update_threshold: Trigger snapshot after N updates (e.g., 1000)
  • time_threshold: Trigger snapshot after duration (e.g., every 5 minutes)
  • compression: zstd compression level 0-9 (default: 3 for speed/ratio balance)
  • retention: Keep N historical snapshots (e.g., 5 recent snapshots)

Algorithm: Create Snapshot

Input: file_id, database_instance
Output: Result<()>

1. Get Write Lock:
   - Acquire exclusive lock on document
   - (Prevents new updates during snapshot)

2. Encode Full State:
   - Create default state vector (represents empty state)
   - Encode entire Y.Doc as binary update
   - This captures all document content

3. Compress (CPU-intensive work):
   - Offload to worker pool with Priority::Low
   - Use zstd compression level 3
   - Results in binary blob

4. Store in Persistent Storage:
   - Generate version string (e.g., "v001", "v002")
   - Store compressed snapshot in BlobAdapter:
     * key format: "{file_id}~snapshot~{version}"
     * value: compressed binary
   - This persists document state across restarts

5. Manage Retention:
   - List all existing snapshots for this file_id
   - If count > retention_limit:
     * Delete oldest snapshots
     * Keep most recent N snapshots

6. Release Write Lock
7. Return success

Benefits:
- New connections load snapshot instead of replaying all updates
- Significantly faster initial sync for large documents
- Binary format is compact and efficient

Loading from Snapshot

Algorithm: Load Database from Storage

Input: file_id, tn_id (tenant)
Output: Result<Arc<DatabaseInstance>>

1. Retrieve Latest Snapshot:
   - Query BlobAdapter for "{file_id}~snapshot~latest"
   - If found: proceed to step 2
   - If not found: create empty document (new database)

2. Decompress Snapshot:
   - Use zstd decompression on snapshot bytes
   - Produces binary Yrs update data

3. Initialize Yrs Document:
   - Create new Y.Doc instance
   - Decode compressed binary as Yrs update
   - Apply update to empty document
   - Document now contains snapshot state

4. Load Incremental Updates:
   - Query all updates created since snapshot timestamp
   - Sorted in chronological order
   - For each update:
     * Decode binary update
     * Apply to document (applies changes on top of snapshot)
   - Document now contains latest state

5. Create DatabaseInstance:
   - Wrap Y.Doc in Arc<RwLock> (shared, thread-safe access)
   - Initialize empty sessions HashMap
   - Create Awareness for presence tracking
   - Record current timestamp (for eviction tracking)
   - Set auto_save_handle (optional optimization)

6. Start Auto-Save Task:
   - Schedule background task for periodic snapshots
   - Uses snapshot_interval from DatabaseManager config
   - Task runs in background without blocking

7. Register in Memory Cache:
   - Add instance to DatabaseManager.instances map
   - File_id → Arc<DatabaseInstance>

8. Return Arc-wrapped instance
   - Callers can clone Arc for shared access

Optimization:
- Snapshot avoids replaying thousands of individual updates
- Typically reduces load time from seconds → milliseconds for large docs

Client Integration

Basic Setup

import * as Y from 'yjs'
import { WebsocketProvider } from 'y-websocket'

// Create document
const doc = new Y.Doc()

// Connect to Cloudillo
const provider = new WebsocketProvider(
  `wss://cl-o.example.com/ws/db/${fileId}`,
  'document-name',  // Room name (arbitrary)
  doc,
  {
    params: {
      token: accessToken  // Auth token as query param
    }
  }
)

// Get shared types
const text = doc.getText('content')
const map = doc.getMap('metadata')

// Make changes
text.insert(0, 'Hello World!')
map.set('title', 'My Document')

// Changes automatically sync!

Editor Integration: ProseMirror

import { ySyncPlugin, yCursorPlugin, yUndoPlugin } from 'y-prosemirror'
import { EditorState } from 'prosemirror-state'
import { EditorView } from 'prosemirror-view'
import { schema } from 'prosemirror-schema-basic'

const ytext = doc.getText('prosemirror')

const state = EditorState.create({
  schema,
  plugins: [
    ySyncPlugin(ytext),
    yCursorPlugin(provider.awareness),
    yUndoPlugin(),
  ]
})

const view = new EditorView(document.querySelector('#editor'), {
  state
})

Editor Integration: Monaco (VS Code)

import * as monaco from 'monaco-editor'
import { MonacoBinding } from 'y-monaco'

const ytext = doc.getText('monaco')

const editor = monaco.editor.create(document.getElementById('monaco'), {
  value: '',
  language: 'javascript'
})

const binding = new MonacoBinding(
  ytext,
  editor.getModel(),
  new Set([editor]),
  provider.awareness
)

Editor Integration: Quill

import Quill from 'quill'
import { QuillBinding } from 'y-quill'

const ytext = doc.getText('quill')

const quill = new Quill('#editor', {
  theme: 'snow'
})

const binding = new QuillBinding(ytext, quill, provider.awareness)

Use Cases

Collaborative Text Editor

const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const text = doc.getText('content')

// Bind to editor
const editor = new ProseMirrorEditor(text, provider.awareness)

Shared Whiteboard

const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const shapes = doc.getArray('shapes')

// Add shape
shapes.push([{
  type: 'rectangle',
  x: 100,
  y: 100,
  width: 200,
  height: 150,
  color: '#ff0000'
}])

// Observe changes
shapes.observe(event => {
  redrawCanvas(shapes.toArray())
})

Collaborative Form

const doc = new Y.Doc()
const provider = new WebsocketProvider(url, doc)
const form = doc.getMap('form')

// Update fields
form.set('name', 'Alice')
form.set('email', 'alice@example.com')
form.set('preferences', { newsletter: true, notifications: false })

// Observe changes
form.observe(event => {
  event.changes.keys.forEach((change, key) => {
    updateFormField(key, form.get(key))
  })
})

Performance Optimization

Connection Pooling

Reuse WebSocket connections:

class CloudilloProvider {
  static connections = new Map()

  static getProvider(fileId, doc) {
    if (!this.connections.has(fileId)) {
      const provider = new WebsocketProvider(url, doc)
      this.connections.set(fileId, provider)
    }
    return this.connections.get(fileId)
  }
}

Batching Updates

Group rapid changes:

let pending = []
let timeout = null

function batchUpdate(update) {
  pending.push(update)

  clearTimeout(timeout)
  timeout = setTimeout(() => {
    // Apply all pending updates
    doc.transact(() => {
      pending.forEach(u => u())
    })
    pending = []
  }, 50)  // Batch for 50ms
}

Memory Management

Manage in-memory database instances efficiently:

EvictionPolicy Configuration:

  • max_instances: Maximum number of documents in memory (e.g., 100)
  • idle_timeout: Evict documents unused for duration (e.g., 30 minutes)
  • lru_eviction: Use Least Recently Used strategy when at max capacity
  • pinned: HashSet of file_ids never evicted (for critical documents)

Eviction Algorithm:

  1. Track last_accessed timestamp for each instance
  2. On timer (every 1 minute):
    • Find all instances exceeding idle_timeout
    • If instance count > max_instances:
      • Mark least recently accessed for eviction
      • Respect pinned set (never evict these)
  3. For each instance marked for eviction:
    • Trigger snapshot (save state to persistent storage)
    • Remove from instances map
    • Document can be reloaded later from snapshot

Security Considerations

Authentication

Algorithm: Authenticate WebSocket Connection

Input: HTTP headers from WebSocket upgrade request
Output: Result<Auth>

1. Extract Authorization Header:
   - Look for "authorization" header
   - If missing: Return Unauthorized error

2. Parse Bearer Token:
   - Convert header value to string
   - Check for "Bearer " prefix
   - Extract token (everything after "Bearer ")
   - If format invalid: Return InvalidToken error

3. Validate Token:
   - Call token validation function
   - Verifies signature using profile key
   - Checks expiration
   - Extracts claims (tn_id, id_tag, scope)

4. Return Auth context
   - Contains user identity and permissions

Permission Enforcement

Algorithm: Check Database Permission

Input: Auth context, DatabaseMetadata, Action (read/write)
Output: Result<()>

1. Check Ownership:
   - If auth.id_tag == db_meta.owner: Return Ok
   - Owner has all permissions

2. Check Action-Specific Permissions:
   - For Read action:
     * If auth.id_tag in db_meta.permissions.readers: Return Ok
   - For Write action:
     * If auth.id_tag in db_meta.permissions.writers: Return Ok

3. Default Deny:
   - If no permission found: Return PermissionDenied

Rate Limiting

Server-Side Rate Limits:

  • MAX_UPDATES_PER_SECOND: 100 updates per user per second

    • Prevents overwhelming with document changes
    • Typical usage: 5-20 updates/second for collaborative editing
  • MAX_AWARENESS_UPDATES_PER_SECOND: 10 awareness updates per user per second

    • Controls cursor/presence update frequency
    • Prevents spam of position updates

Implementation:

  • Per-user sliding window counter
  • Track requests in last 1 second
  • When exceeded: Return HTTP 429 with retry-after header

See Also