WebSocket API

Overview

Cloudillo provides three WebSocket endpoints for real-time features: Bus (pub/sub messaging), RTDB (real-time database), and CRDT (collaborative documents).

Endpoint Purpose
/ws/bus Event bus for notifications and batch updates
/ws/rtdb/{file_id} Real-time database sync
/ws/crdt/{doc_id} CRDT collaboration (Yjs protocol)

Message Bus (/ws/bus)

The message bus provides pub/sub messaging, presence tracking, and notifications.

Connection

import * as cloudillo from '@cloudillo/core'

await cloudillo.init('my-app')

const bus = cloudillo.openMessageBus({
  channels: ['notifications', 'presence']
})

Protocol

Client → Server:

// Subscribe to channel
{
  type: 'subscribe',
  channel: 'notifications'
}

// Unsubscribe
{
  type: 'unsubscribe',
  channel: 'notifications'
}

// Publish message
{
  type: 'publish',
  channel: 'notifications',
  data: {
    event: 'new-post',
    actionId: 'act_123'
  }
}

// Set presence
{
  type: 'presence',
  status: 'online',
  data: {
    currentPage: '/posts'
  }
}

Server → Client:

// Message received
{
  type: 'message',
  channel: 'notifications',
  data: {
    event: 'new-post',
    actionId: 'a1~abc123'
  },
  from: 'alice@example.com',
  timestamp: '2025-01-01T12:00:00Z'
}

// Presence update
{
  type: 'presence',
  userId: 'bob@example.com',
  status: 'online',
  data: {...}
}

// Subscription confirmed
{
  type: 'subscribed',
  channel: 'notifications'
}

Usage Example

const bus = cloudillo.openMessageBus()

// Subscribe to notifications
bus.subscribe('notifications', (message) => {
  console.log('Notification:', message)
  showToast(message.data)
})

// Publish typing indicator
bus.publish('typing', {
  conversationId: 'conv_123',
  typing: true
})

// Set presence
bus.setPresence('online', {
  currentPage: window.location.pathname
})

// Listen for presence changes
bus.on('presence', (update) => {
  console.log(`${update.userId} is ${update.status}`)
})

Real-Time Database (/ws/rtdb/:fileId)

Real-time synchronization of structured data. See RTDB for full documentation.

Connection

import { RtdbClient } from '@cloudillo/rtdb'
import { getRtdbUrl } from '@cloudillo/core'

const rtdb = new RtdbClient({
  dbId: 'my-database-id',
  auth: {
    getToken: () => bus.accessToken
  },
  serverUrl: getRtdbUrl(bus.idTag!, 'my-database-id', bus.accessToken!)
})

await rtdb.connect()

Protocol

Client → Server:

// Query documents
{
  type: 'query',
  id: 1,
  path: 'todos',
  filter: { equals: { field: 'completed', value: false } },
  sort: { field: 'createdAt', direction: 'desc' },
  limit: 20
}

// Subscribe to path
{
  type: 'subscribe',
  id: 2,
  path: 'todos',
  filter: { equals: { field: 'completed', value: false } }
}

// Get single document
{
  type: 'get',
  id: 3,
  path: 'todos/todo_123'
}

// Transaction (wraps create/update/delete)
{
  type: 'transaction',
  id: 4,
  operations: [
    { type: 'create', path: 'todos', data: { title: 'Learn Cloudillo', completed: false } },
    { type: 'update', path: 'todos/todo_123', data: { completed: true } },
    { type: 'delete', path: 'todos/todo_456' }
  ]
}

// Lock / unlock document
{ type: 'lock', id: 5, path: 'todos/todo_123', mode: 'hard' }
{ type: 'unlock', id: 6, path: 'todos/todo_123' }

// Ping
{ type: 'ping', id: 7 }

Server → Client:

// Query result
{
  type: 'queryResult',
  id: 1,
  data: [
    { _id: 'todo_123', title: 'Learn Cloudillo', completed: false },
    { _id: 'todo_456', title: 'Build app', completed: false }
  ],
  total: 2
}

// Get result
{
  type: 'getResult',
  id: 3,
  data: { _id: 'todo_123', title: 'Learn Cloudillo', completed: false }
}

// Subscribe result (initial data)
{
  type: 'subscribeResult',
  id: 2,
  subscriptionId: 'sub_abc123',
  data: [
    { _id: 'todo_123', title: 'Learn Cloudillo', completed: false }
  ]
}

// Change event (real-time update)
{
  type: 'change',
  subscriptionId: 'sub_abc123',
  event: {
    action: 'create',  // create | update | delete | lock | unlock | ready
    path: 'todos',
    data: { _id: 'todo_789', title: 'New todo', completed: false }
  }
}

// Transaction result
{
  type: 'transactionResult',
  id: 4,
  results: [
    { id: 'todo_new_001' },
    { id: 'todos/todo_123' },
    { id: 'todos/todo_456' }
  ]
}

// Lock result
{ type: 'lockResult', id: 5, locked: true }

// Error
{
  type: 'error',
  id: 4,
  code: 'permission_denied',
  message: 'Insufficient permissions'
}

// Pong
{ type: 'pong', id: 7 }

Collaborative Documents (/ws/crdt/:docId)

CRDT synchronization using Yjs protocol. See CRDT for full documentation.

Connection

import * as Y from 'yjs'
import * as cloudillo from '@cloudillo/core'

const yDoc = new Y.Doc()
const { provider } = await cloudillo.openYDoc(yDoc, 'my-document')

Protocol

The CRDT endpoint uses the y-websocket protocol:

Binary messages:

  • Sync Step 1: Client sends document state
  • Sync Step 2: Server responds with missing updates
  • Updates: Incremental document changes
  • Awareness: Cursor positions, selections, user info

Awareness format:

{
  user: {
    name: 'Alice Johnson',
    idTag: 'alice@example.com',
    color: '#ff6b6b'
  },
  cursor: {
    line: 10,
    column: 5
  },
  selection: {
    start: { line: 10, column: 5 },
    end: { line: 10, column: 10 }
  }
}

Usage Example

const yDoc = new Y.Doc()
const { provider } = await cloudillo.openYDoc(yDoc, 'doc_123')

// Use shared types
const yText = yDoc.getText('content')
yText.insert(0, 'Hello, collaborative world!')

// Listen for remote changes
yText.observe((event) => {
  console.log('Text changed by:', event.transaction.origin)
})

// Awareness (see other users)
provider.awareness.on('change', () => {
  const states = provider.awareness.getStates()
  states.forEach((state, clientId) => {
    console.log(`User ${state.user.name} at cursor ${state.cursor}`)
  })
})

// Set own awareness
provider.awareness.setLocalState({
  user: {
    name: cloudillo.name,
    idTag: cloudillo.idTag,
    color: '#' + Math.random().toString(16).slice(2, 8)
  },
  cursor: { line: 5, column: 10 }
})

Authentication

All WebSocket connections require authentication via query parameter:

wss://server.com/ws/bus?token=eyJhbGc...
wss://server.com/ws/rtdb/file_123?token=eyJhbGc...
wss://server.com/ws/crdt/doc_123?token=eyJhbGc...

The client libraries handle this automatically.

Reconnection

All WebSocket connections implement automatic reconnection with exponential backoff:

  • Initial retry: 1 second
  • Max retry delay: 30 seconds
  • Exponential factor: 1.5

Handling reconnection:

provider.on('status', ({ status }) => {
  if (status === 'connected') {
    console.log('Connected to server')
  } else if (status === 'disconnected') {
    console.log('Disconnected, will retry...')
  }
})

Best Practices

1. Clean Up Connections

// React example
useEffect(() => {
  const bus = cloudillo.openMessageBus()

  bus.subscribe('notifications', handleNotification)

  return () => {
    bus.close() // Clean up on unmount
  }
}, [])

2. Handle Connection State

const [connected, setConnected] = useState(false)

provider.on('status', ({ status }) => {
  setConnected(status === 'connected')
})

// Show offline indicator
{!connected && <div className="offline-banner">Reconnecting...</div>}

3. Batch Updates

// ❌ Don't send updates individually
for (let i = 0; i < 100; i++) {
  yText.insert(i, 'x')
}

// ✅ Batch in a transaction
yDoc.transact(() => {
  for (let i = 0; i < 100; i++) {
    yText.insert(i, 'x')
  }
})

See Also

  • RTDB - Real-time database documentation
  • CRDT - Collaborative editing documentation
  • Authentication - WebSocket authentication