WebSocket Bus

Cloudillo’s WebSocket Bus provides real-time notifications and presence tracking for connected clients. This is separate from the RTDB and CRDT WebSocket protocols, serving as a general-purpose notification system.

WebSocket Endpoints Overview

Cloudillo provides three WebSocket endpoints for different real-time use cases:

Endpoint Purpose Protocol Documentation
/ws/bus Notifications, presence, typing indicators JSON messages This page
/ws/rtdb/{file_id} Real-time database subscriptions Binary protocol RTDB Protocol
/ws/crdt/{doc_id} Collaborative document editing (Yjs) Binary sync protocol CRDT Protocol

When to Use Each Endpoint

  • Bus (/ws/bus): General notifications, presence, typing—use when you need to know about events happening across the platform (new posts, messages, connection requests).
  • RTDB (/ws/rtdb/{file_id}): Real-time app state—use when your application needs live-updating data (todo lists, dashboards, game state).
  • CRDT (/ws/crdt/{doc_id}): Collaborative editing—use when multiple users edit the same document simultaneously (text documents, whiteboards, spreadsheets).

Overview

The WebSocket Bus enables:

  • Real-time notifications - Action events, messages, updates
  • Presence tracking - Online/offline status of users
  • Typing indicators - Show when users are typing
  • Action broadcasts - New posts, comments, reactions
  • Live updates - Content changes without polling

WebSocket Endpoint

Connection

URL: wss://cl-o.{domain}/ws/bus

Authentication: Required via query parameter or header

// Connect with access token
const token = 'your_access_token';
const ws = new WebSocket(`wss://cl-o.example.com/ws/bus?token=${token}`);

// Or via Authorization header (if supported by client)
const ws = new WebSocket('wss://cl-o.example.com/ws/bus');
// Set Authorization header before connecting

Connection Flow

Client                          Server
  |                               |
  |--- GET /ws/bus?token=... ---->|
  |                               |--- Validate token
  |                               |--- Create session
  |                               |
  |<-- 101 Switching Protocols ---|
  |                               |
  |<===== WebSocket Open ========>|
  |                               |
  |<-- welcome message ----------|
  |                               |
  |--- subscribe message -------->|
  |                               |
  |<===== Event Stream ==========>|

Message Protocol

All messages use JSON format (not binary like RTDB/CRDT).

Message Structure

{
  "type": "message_type",
  "data": { ... },
  "timestamp": 1738483200
}

Client → Server Messages

Subscribe to Events

Subscribe to specific event types:

{
  "type": "subscribe",
  "events": ["action", "presence", "typing"]
}

Event Types:

  • action - New actions (posts, comments, reactions)
  • presence - User online/offline status
  • typing - Typing indicators
  • notification - General notifications
  • message - Direct messages

Unsubscribe from Events

{
  "type": "unsubscribe",
  "events": ["typing"]
}

Send Presence Update

Update your online status:

{
  "type": "presence",
  "status": "online",
  "activity": "browsing"
}

Status values:

  • online - Active and available
  • away - Inactive but connected
  • busy - Do not disturb
  • offline - Explicitly offline

Send Typing Indicator

Notify others you’re typing:

{
  "type": "typing",
  "recipient": "bob.example.com",
  "conversation_id": "conv_123"
}

Auto-timeout: Typing indicators automatically expire after 5 seconds.

Heartbeat/Ping

Keep connection alive:

{
  "type": "ping"
}

Server responds with:

{
  "type": "pong",
  "timestamp": 1738483200
}

Server → Client Messages

Welcome Message

Sent immediately after connection:

{
  "type": "welcome",
  "session_id": "sess_abc123",
  "user": "alice.example.com",
  "timestamp": 1738483200
}

Action Notification

Notify about new actions:

{
  "type": "action",
  "action_type": "POST",
  "action_id": "a1~xyz789...",
  "issuer": "bob.example.com",
  "content": "Check out this cool feature!",
  "timestamp": 1738483200
}

Action types: POST, REPOST, CMNT, REACT, MSG, CONN, FLLW, etc.

Presence Update

Notify about user status changes:

{
  "type": "presence",
  "user": "bob.example.com",
  "status": "online",
  "activity": "browsing",
  "timestamp": 1738483200
}

Sent when:

  • User connects/disconnects
  • User explicitly updates status
  • User goes idle (after 5 minutes of inactivity)

Typing Indicator

Notify that someone is typing:

{
  "type": "typing",
  "user": "bob.example.com",
  "conversation_id": "conv_123",
  "timestamp": 1738483200
}

Auto-clear: Client should clear typing indicator after 5 seconds if no update received.

Direct Message Notification

Notify about new direct messages:

{
  "type": "message",
  "message_id": "a1~msg789...",
  "sender": "bob.example.com",
  "preview": "Hey, are you available for a call?",
  "timestamp": 1738483200
}

Preview: First ~100 characters of message content.

General Notification

System notifications:

{
  "type": "notification",
  "notification_type": "connection_request",
  "from": "charlie.example.com",
  "message": "Charlie wants to connect with you",
  "action_url": "/connections/pending",
  "timestamp": 1738483200
}

Error Message

Error responses:

{
  "type": "error",
  "code": "E-WS-AUTH",
  "message": "Invalid or expired token",
  "timestamp": 1738483200
}

Common error codes:

  • E-WS-AUTH - Authentication failed
  • E-WS-INVALID - Invalid message format
  • E-WS-RATE - Rate limit exceeded
  • E-WS-PERMISSION - Permission denied

Client Implementation

JavaScript/Browser

class CloudilloBus {
  constructor(domain, token) {
    this.domain = domain;
    this.token = token;
    this.ws = null;
    this.handlers = new Map();
  }

  connect() {
    this.ws = new WebSocket(`wss://cl-o.${this.domain}/ws/bus?token=${this.token}`);

    this.ws.onopen = () => {
      console.log('Connected to WebSocket Bus');

      // Subscribe to events
      this.send({
        type: 'subscribe',
        events: ['action', 'presence', 'typing', 'message']
      });
    };

    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      this.handleMessage(message);
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };

    this.ws.onclose = () => {
      console.log('WebSocket closed');
      // Reconnect after 5 seconds
      setTimeout(() => this.connect(), 5000);
    };

    // Send ping every 30 seconds
    setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.send({ type: 'ping' });
      }
    }, 30000);
  }

  send(message) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    }
  }

  on(eventType, handler) {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
    }
    this.handlers.get(eventType).push(handler);
  }

  handleMessage(message) {
    const handlers = this.handlers.get(message.type) || [];
    handlers.forEach(handler => handler(message.data || message));
  }

  updatePresence(status, activity) {
    this.send({
      type: 'presence',
      status,
      activity
    });
  }

  sendTyping(recipient, conversationId) {
    this.send({
      type: 'typing',
      recipient,
      conversation_id: conversationId
    });
  }

  disconnect() {
    if (this.ws) {
      this.ws.close();
    }
  }
}

// Usage
const bus = new CloudilloBus('example.com', accessToken);

bus.on('action', (data) => {
  console.log('New action:', data);
  // Update UI with new post/comment/etc
});

bus.on('presence', (data) => {
  console.log('Presence update:', data);
  // Update online status indicators
});

bus.on('typing', (data) => {
  console.log('User typing:', data);
  // Show typing indicator
});

bus.on('message', (data) => {
  console.log('New message:', data);
  // Show notification, update message list
});

bus.connect();

React Hook

import { useEffect, useState, useRef } from 'react';

export function useCloudilloBus(domain, token) {
  const [connected, setConnected] = useState(false);
  const [events, setEvents] = useState([]);
  const busRef = useRef(null);

  useEffect(() => {
    if (!token) return;

    const bus = new CloudilloBus(domain, token);
    busRef.current = bus;

    bus.on('welcome', () => setConnected(true));
    bus.on('action', (data) => {
      setEvents(prev => [...prev, { type: 'action', data }]);
    });
    bus.on('message', (data) => {
      setEvents(prev => [...prev, { type: 'message', data }]);
    });

    bus.connect();

    return () => {
      bus.disconnect();
      setConnected(false);
    };
  }, [domain, token]);

  return {
    connected,
    events,
    bus: busRef.current
  };
}

// Usage in component
function MyComponent() {
  const { connected, events, bus } = useCloudilloBus('example.com', accessToken);

  const handleTyping = () => {
    bus?.sendTyping('bob.example.com', 'conv_123');
  };

  return (
    <div>
      <p>Status: {connected ? 'Connected' : 'Disconnected'}</p>
      <ul>
        {events.map((event, i) => (
          <li key={i}>{event.type}: {JSON.stringify(event.data)}</li>
        ))}
      </ul>
    </div>
  );
}

Server Implementation

Connection Handler

use axum::{
  extract::{Query, State, WebSocketUpgrade},
  response::Response,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize)]
struct BusQuery {
  token: String,
}

pub async fn ws_bus_handler(
  ws: WebSocketUpgrade,
  Query(query): Query<BusQuery>,
  State(app): State<Arc<App>>,
) -> Response {
  // Validate access token
  let auth = match validate_access_token(&app, &query.token).await {
    Ok(auth) => auth,
    Err(_) => return (StatusCode::UNAUTHORIZED).into_response(),
  };

  // Upgrade to WebSocket
  ws.on_upgrade(move |socket| handle_bus_socket(socket, auth, app))
}

async fn handle_bus_socket(
  mut socket: WebSocket,
  auth: Auth,
  app: Arc<App>,
) {
  // Generate session ID
  let session_id = generate_session_id();

  // Send welcome message
  let welcome = json!({
    "type": "welcome",
    "session_id": session_id,
    "user": auth.id_tag,
    "timestamp": Utc::now().timestamp(),
  });

  if socket.send(Message::Text(welcome.to_string())).await.is_err() {
    return;
  }

  // Register session in bus
  app.bus.add_session(session_id.clone(), auth.id_tag.clone()).await;

  // Handle incoming messages
  while let Some(msg) = socket.recv().await {
    match msg {
      Ok(Message::Text(text)) => {
        if let Ok(message) = serde_json::from_str::<BusMessage>(&text) {
          handle_bus_message(message, &auth, &app, &mut socket).await;
        }
      }
      Ok(Message::Close(_)) => break,
      _ => {}
    }
  }

  // Cleanup session
  app.bus.remove_session(&session_id).await;
}

Message Handling

#[derive(Deserialize)]
#[serde(tag = "type")]
enum BusMessage {
  Subscribe { events: Vec<String> },
  Unsubscribe { events: Vec<String> },
  Presence { status: String, activity: Option<String> },
  Typing { recipient: String, conversation_id: String },
  Ping,
}

async fn handle_bus_message(
  message: BusMessage,
  auth: &Auth,
  app: &App,
  socket: &mut WebSocket,
) {
  match message {
    BusMessage::Subscribe { events } => {
      // Subscribe to events
      app.bus.subscribe(&auth.id_tag, events).await;
    }
    BusMessage::Presence { status, activity } => {
      // Broadcast presence update
      let update = json!({
        "type": "presence",
        "user": auth.id_tag,
        "status": status,
        "activity": activity,
        "timestamp": Utc::now().timestamp(),
      });

      app.bus.broadcast(&auth.id_tag, update).await;
    }
    BusMessage::Typing { recipient, conversation_id } => {
      // Send typing indicator to recipient
      let typing = json!({
        "type": "typing",
        "user": auth.id_tag,
        "conversation_id": conversation_id,
        "timestamp": Utc::now().timestamp(),
      });

      app.bus.send_to(&recipient, typing).await;
    }
    BusMessage::Ping => {
      // Respond with pong
      let pong = json!({
        "type": "pong",
        "timestamp": Utc::now().timestamp(),
      });

      socket.send(Message::Text(pong.to_string())).await.ok();
    }
    _ => {}
  }
}

Use Cases

Real-time Feed Updates

Show new posts as they’re created:

bus.on('action', (data) => {
  if (data.action_type === 'POST' && data.issuer !== currentUser) {
    showNotification(`New post from ${data.issuer}`);
    prependToFeed(data);
  }
});

Online Status Indicators

Track who’s online:

const onlineUsers = new Set();

bus.on('presence', (data) => {
  if (data.status === 'online') {
    onlineUsers.add(data.user);
  } else {
    onlineUsers.delete(data.user);
  }

  updateOnlineIndicators();
});

Typing Indicators in Chat

Show when someone is typing:

const typingUsers = new Map();

bus.on('typing', (data) => {
  typingUsers.set(data.user, Date.now());
  showTypingIndicator(data.user, data.conversation_id);

  // Clear after 5 seconds
  setTimeout(() => {
    if (Date.now() - typingUsers.get(data.user) >= 5000) {
      hideTypingIndicator(data.user);
      typingUsers.delete(data.user);
    }
  }, 5000);
});

// Send typing indicator when user types
messageInput.addEventListener('input', () => {
  bus.sendTyping(recipientId, conversationId);
});

Message Notifications

Instant message delivery:

bus.on('message', (data) => {
  playNotificationSound();
  showNotification(`New message from ${data.sender}`, data.preview);

  if (currentConversation === data.sender) {
    fetchAndDisplayMessage(data.message_id);
  } else {
    incrementUnreadCount(data.sender);
  }
});

Real-time Action Forwarding

The WebSocket Bus integrates with the action processing system to provide real-time updates when actions are created or received.

Outbound Forwarding

When a user creates an action locally, it’s forwarded to their connected WebSocket clients:

Outbound Action Forwarding:

1. User creates action via POST /api/actions
2. Action is processed and stored
3. ActionCreatorTask completes
4. Forward to WebSocket Bus:
   a. Find all WebSocket sessions for user's tenant
   b. Send action notification to each session
5. Client receives real-time update

Audience Targeting:
- POST: Broadcast to creator's sessions
- MSG: Forward to all conversation participant sessions
- CMNT/REACT: Forward to parent action owner's sessions

Outbound Message Format:

{
  "type": "action",
  "action_type": "POST",
  "action_id": "a1~xyz789...",
  "issuer": "alice.example.com",
  "content": "New post content",
  "timestamp": 1738483200,
  "source": "local"
}

Inbound Forwarding

When a federated action is received from a remote instance:

Inbound Action Forwarding:

1. Remote action arrives at POST /api/inbox
2. ActionVerifierTask verifies and stores action
3. After successful processing:
   a. Identify target tenant from action audience
   b. Find all WebSocket sessions for that tenant
   c. Broadcast action to all sessions
4. All connected clients see the new action

Broadcast Scope:
- All clients for the target tenant receive notification
- Client-side filtering determines what to display
- Enables real-time feed updates without polling

Inbound Message Format:

{
  "type": "action",
  "action_type": "POST",
  "action_id": "a1~abc123...",
  "issuer": "bob.remote.com",
  "content": "Federated post",
  "timestamp": 1738483200,
  "source": "federated"
}

Forwarding Flow Diagram

Local Action Creation:
┌─────────┐    ┌──────────┐    ┌─────────────┐    ┌──────────────┐
│ Client  │───►│ API      │───►│ Task        │───►│ WebSocket    │
│ (POST)  │    │ Handler  │    │ Scheduler   │    │ Bus          │
└─────────┘    └──────────┘    └─────────────┘    └──────┬───────┘
                                                         │
                                    ┌────────────────────┼────────────────────┐
                                    ▼                    ▼                    ▼
                              ┌──────────┐        ┌──────────┐        ┌──────────┐
                              │ Session 1│        │ Session 2│        │ Session 3│
                              │ (Tab 1)  │        │ (Tab 2)  │        │ (Mobile) │
                              └──────────┘        └──────────┘        └──────────┘

Federated Action Reception:
┌─────────────┐    ┌──────────┐    ┌─────────────┐    ┌──────────────┐
│ Remote      │───►│ /api/    │───►│ Verifier    │───►│ WebSocket    │
│ Instance    │    │ inbox    │    │ Task        │    │ Bus          │
└─────────────┘    └──────────┘    └─────────────┘    └──────┬───────┘
                                                              │
                                                    Broadcast to all
                                                    tenant sessions

Push Notification Integration

For users who are offline (no active WebSocket connection), the system can send push notifications for important actions.

Push Notification Decision Tree

Push Notification Decision:

1. Action is processed successfully
   │
2. Check if forwarded to WebSocket
   ├─ Yes → User is online, skip push notification
   └─ No  → User is offline, continue...
         │
3. Check action type eligibility
   ├─ MSG (Message) → Eligible
   ├─ CONN (Connection request) → Eligible
   ├─ FSHR (File share) → Eligible
   ├─ CMNT (Comment on user's content) → Eligible
   └─ Other → Not eligible, skip
         │
4. Check user notification preferences
   ├─ Notifications disabled → Skip
   └─ Notifications enabled → Continue...
         │
5. Check notification throttling
   ├─ Too many recent notifications → Skip (prevent spam)
   └─ Within limits → Continue...
         │
6. Schedule push notification task

Push Notification Types

Action Type Push Notification Content
MSG Direct message “{sender} sent you a message”
CONN Connection request “{sender} wants to connect”
FSHR File share “{sender} shared a file with you”
CMNT Comment “{sender} commented on your post”
REACT Reaction Grouped, not individual pushes
FLLW Follow “{sender} started following you”

Push Task Scheduling

schedule_push_notification(action, recipient):
    # Check eligibility
    if NOT is_push_eligible(action.type):
        return

    # Check if user is online
    if websocket_bus.has_active_session(recipient):
        return  # User will see via WebSocket

    # Check user preferences
    prefs = load_notification_preferences(recipient)
    if NOT prefs.push_enabled:
        return

    if NOT prefs.allows_type(action.type):
        return

    # Check throttling
    recent_count = count_recent_pushes(recipient, window: 1 hour)
    if recent_count > MAX_PUSHES_PER_HOUR:
        return

    # Schedule the push
    task = PushNotificationTask {
        recipient: recipient,
        action_type: action.type,
        sender: action.issuer,
        preview: generate_preview(action),
        action_url: generate_deep_link(action),
    }

    scheduler.task(task).schedule()

Client Registration

Clients register for push notifications during WebSocket connection:

// During WebSocket setup
bus.send({
  type: 'register_push',
  token: pushNotificationToken,  // From FCM/APNs
  platform: 'android'  // or 'ios', 'web'
});

// Server stores token for offline delivery

Performance Considerations

Connection Limits

Per server instance:

  • Default: 10,000 concurrent connections
  • Configurable via MAX_WS_CONNECTIONS

Per user:

  • Default: 5 concurrent connections (multiple tabs/devices)
  • Configurable via MAX_WS_CONNECTIONS_PER_USER

Message Rate Limiting

Client → Server:

  • 100 messages per minute per connection
  • Bursts of 20 messages allowed

Server → Client:

  • No hard limit (trust server)
  • Throttling applied for presence updates (max 1 per second per user)

Reconnection Strategy

Client should implement:

  1. Exponential backoff (start at 1s, max 60s)
  2. Random jitter to prevent thundering herd
  3. Token refresh if expired
  4. Event replay from last known timestamp

Security Considerations

Authentication

  • Token required for all connections
  • Token validated on initial connection
  • Invalid token = immediate disconnect
  • Token expiration checked periodically

Authorization

  • Users only receive events they have permission to see
  • Presence updates only for connections/followers
  • Message notifications only for actual recipients
  • Action notifications respect ABAC visibility

Rate Limiting

  • Per-connection message rate limiting
  • Per-user connection limits
  • Typing indicator throttling
  • Presence update throttling

See Also