WebSocket Bus

Cloudillo’s WebSocket Bus provides real-time notifications and presence tracking for connected clients. This is separate from the RTDB and CRDT WebSocket protocols, serving as a general-purpose notification system.

Overview

The WebSocket Bus enables:

  • Real-time notifications - Action events, messages, updates
  • Presence tracking - Online/offline status of users
  • Typing indicators - Show when users are typing
  • Action broadcasts - New posts, comments, reactions
  • Live updates - Content changes without polling

WebSocket Endpoint

Connection

URL: wss://cl-o.{domain}/ws/bus

Authentication: Required via query parameter or header

// Connect with access token
const token = 'your_access_token';
const ws = new WebSocket(`wss://cl-o.example.com/ws/bus?token=${token}`);

// Or via Authorization header (if supported by client)
const ws = new WebSocket('wss://cl-o.example.com/ws/bus');
// Set Authorization header before connecting

Connection Flow

Client                          Server
  |                               |
  |--- GET /ws/bus?token=... ---->|
  |                               |--- Validate token
  |                               |--- Create session
  |                               |
  |<-- 101 Switching Protocols ---|
  |                               |
  |<===== WebSocket Open ========>|
  |                               |
  |<-- welcome message ----------|
  |                               |
  |--- subscribe message -------->|
  |                               |
  |<===== Event Stream ==========>|

Message Protocol

All messages use JSON format (not binary like RTDB/CRDT).

Message Structure

{
  "type": "message_type",
  "data": { ... },
  "timestamp": 1738483200
}

Client → Server Messages

Subscribe to Events

Subscribe to specific event types:

{
  "type": "subscribe",
  "events": ["action", "presence", "typing"]
}

Event Types:

  • action - New actions (posts, comments, reactions)
  • presence - User online/offline status
  • typing - Typing indicators
  • notification - General notifications
  • message - Direct messages

Unsubscribe from Events

{
  "type": "unsubscribe",
  "events": ["typing"]
}

Send Presence Update

Update your online status:

{
  "type": "presence",
  "status": "online",
  "activity": "browsing"
}

Status values:

  • online - Active and available
  • away - Inactive but connected
  • busy - Do not disturb
  • offline - Explicitly offline

Send Typing Indicator

Notify others you’re typing:

{
  "type": "typing",
  "recipient": "bob.example.com",
  "conversation_id": "conv_123"
}

Auto-timeout: Typing indicators automatically expire after 5 seconds.

Heartbeat/Ping

Keep connection alive:

{
  "type": "ping"
}

Server responds with:

{
  "type": "pong",
  "timestamp": 1738483200
}

Server → Client Messages

Welcome Message

Sent immediately after connection:

{
  "type": "welcome",
  "session_id": "sess_abc123",
  "user": "alice.example.com",
  "timestamp": 1738483200
}

Action Notification

Notify about new actions:

{
  "type": "action",
  "action_type": "POST",
  "action_id": "a1~xyz789...",
  "issuer": "bob.example.com",
  "content": "Check out this cool feature!",
  "timestamp": 1738483200
}

Action types: POST, REPOST, CMNT, REACT, MSG, CONN, FLLW, etc.

Presence Update

Notify about user status changes:

{
  "type": "presence",
  "user": "bob.example.com",
  "status": "online",
  "activity": "browsing",
  "timestamp": 1738483200
}

Sent when:

  • User connects/disconnects
  • User explicitly updates status
  • User goes idle (after 5 minutes of inactivity)

Typing Indicator

Notify that someone is typing:

{
  "type": "typing",
  "user": "bob.example.com",
  "conversation_id": "conv_123",
  "timestamp": 1738483200
}

Auto-clear: Client should clear typing indicator after 5 seconds if no update received.

Direct Message Notification

Notify about new direct messages:

{
  "type": "message",
  "message_id": "a1~msg789...",
  "sender": "bob.example.com",
  "preview": "Hey, are you available for a call?",
  "timestamp": 1738483200
}

Preview: First ~100 characters of message content.

General Notification

System notifications:

{
  "type": "notification",
  "notification_type": "connection_request",
  "from": "charlie.example.com",
  "message": "Charlie wants to connect with you",
  "action_url": "/connections/pending",
  "timestamp": 1738483200
}

Error Message

Error responses:

{
  "type": "error",
  "code": "E-WS-AUTH",
  "message": "Invalid or expired token",
  "timestamp": 1738483200
}

Common error codes:

  • E-WS-AUTH - Authentication failed
  • E-WS-INVALID - Invalid message format
  • E-WS-RATE - Rate limit exceeded
  • E-WS-PERMISSION - Permission denied

Client Implementation

JavaScript/Browser

class CloudilloBus {
  constructor(domain, token) {
    this.domain = domain;
    this.token = token;
    this.ws = null;
    this.handlers = new Map();
  }

  connect() {
    this.ws = new WebSocket(`wss://cl-o.${this.domain}/ws/bus?token=${this.token}`);

    this.ws.onopen = () => {
      console.log('Connected to WebSocket Bus');

      // Subscribe to events
      this.send({
        type: 'subscribe',
        events: ['action', 'presence', 'typing', 'message']
      });
    };

    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      this.handleMessage(message);
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };

    this.ws.onclose = () => {
      console.log('WebSocket closed');
      // Reconnect after 5 seconds
      setTimeout(() => this.connect(), 5000);
    };

    // Send ping every 30 seconds
    setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.send({ type: 'ping' });
      }
    }, 30000);
  }

  send(message) {
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(message));
    }
  }

  on(eventType, handler) {
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, []);
    }
    this.handlers.get(eventType).push(handler);
  }

  handleMessage(message) {
    const handlers = this.handlers.get(message.type) || [];
    handlers.forEach(handler => handler(message.data || message));
  }

  updatePresence(status, activity) {
    this.send({
      type: 'presence',
      status,
      activity
    });
  }

  sendTyping(recipient, conversationId) {
    this.send({
      type: 'typing',
      recipient,
      conversation_id: conversationId
    });
  }

  disconnect() {
    if (this.ws) {
      this.ws.close();
    }
  }
}

// Usage
const bus = new CloudilloBus('example.com', accessToken);

bus.on('action', (data) => {
  console.log('New action:', data);
  // Update UI with new post/comment/etc
});

bus.on('presence', (data) => {
  console.log('Presence update:', data);
  // Update online status indicators
});

bus.on('typing', (data) => {
  console.log('User typing:', data);
  // Show typing indicator
});

bus.on('message', (data) => {
  console.log('New message:', data);
  // Show notification, update message list
});

bus.connect();

React Hook

import { useEffect, useState, useRef } from 'react';

export function useCloudilloBus(domain, token) {
  const [connected, setConnected] = useState(false);
  const [events, setEvents] = useState([]);
  const busRef = useRef(null);

  useEffect(() => {
    if (!token) return;

    const bus = new CloudilloBus(domain, token);
    busRef.current = bus;

    bus.on('welcome', () => setConnected(true));
    bus.on('action', (data) => {
      setEvents(prev => [...prev, { type: 'action', data }]);
    });
    bus.on('message', (data) => {
      setEvents(prev => [...prev, { type: 'message', data }]);
    });

    bus.connect();

    return () => {
      bus.disconnect();
      setConnected(false);
    };
  }, [domain, token]);

  return {
    connected,
    events,
    bus: busRef.current
  };
}

// Usage in component
function MyComponent() {
  const { connected, events, bus } = useCloudilloBus('example.com', accessToken);

  const handleTyping = () => {
    bus?.sendTyping('bob.example.com', 'conv_123');
  };

  return (
    <div>
      <p>Status: {connected ? 'Connected' : 'Disconnected'}</p>
      <ul>
        {events.map((event, i) => (
          <li key={i}>{event.type}: {JSON.stringify(event.data)}</li>
        ))}
      </ul>
    </div>
  );
}

Server Implementation

Connection Handler

use axum::{
  extract::{Query, State, WebSocketUpgrade},
  response::Response,
};
use serde::{Deserialize, Serialize};

#[derive(Deserialize)]
struct BusQuery {
  token: String,
}

pub async fn ws_bus_handler(
  ws: WebSocketUpgrade,
  Query(query): Query<BusQuery>,
  State(app): State<Arc<App>>,
) -> Response {
  // Validate access token
  let auth = match validate_access_token(&app, &query.token).await {
    Ok(auth) => auth,
    Err(_) => return (StatusCode::UNAUTHORIZED).into_response(),
  };

  // Upgrade to WebSocket
  ws.on_upgrade(move |socket| handle_bus_socket(socket, auth, app))
}

async fn handle_bus_socket(
  mut socket: WebSocket,
  auth: Auth,
  app: Arc<App>,
) {
  // Generate session ID
  let session_id = generate_session_id();

  // Send welcome message
  let welcome = json!({
    "type": "welcome",
    "session_id": session_id,
    "user": auth.id_tag,
    "timestamp": Utc::now().timestamp(),
  });

  if socket.send(Message::Text(welcome.to_string())).await.is_err() {
    return;
  }

  // Register session in bus
  app.bus.add_session(session_id.clone(), auth.id_tag.clone()).await;

  // Handle incoming messages
  while let Some(msg) = socket.recv().await {
    match msg {
      Ok(Message::Text(text)) => {
        if let Ok(message) = serde_json::from_str::<BusMessage>(&text) {
          handle_bus_message(message, &auth, &app, &mut socket).await;
        }
      }
      Ok(Message::Close(_)) => break,
      _ => {}
    }
  }

  // Cleanup session
  app.bus.remove_session(&session_id).await;
}

Message Handling

#[derive(Deserialize)]
#[serde(tag = "type")]
enum BusMessage {
  Subscribe { events: Vec<String> },
  Unsubscribe { events: Vec<String> },
  Presence { status: String, activity: Option<String> },
  Typing { recipient: String, conversation_id: String },
  Ping,
}

async fn handle_bus_message(
  message: BusMessage,
  auth: &Auth,
  app: &App,
  socket: &mut WebSocket,
) {
  match message {
    BusMessage::Subscribe { events } => {
      // Subscribe to events
      app.bus.subscribe(&auth.id_tag, events).await;
    }
    BusMessage::Presence { status, activity } => {
      // Broadcast presence update
      let update = json!({
        "type": "presence",
        "user": auth.id_tag,
        "status": status,
        "activity": activity,
        "timestamp": Utc::now().timestamp(),
      });

      app.bus.broadcast(&auth.id_tag, update).await;
    }
    BusMessage::Typing { recipient, conversation_id } => {
      // Send typing indicator to recipient
      let typing = json!({
        "type": "typing",
        "user": auth.id_tag,
        "conversation_id": conversation_id,
        "timestamp": Utc::now().timestamp(),
      });

      app.bus.send_to(&recipient, typing).await;
    }
    BusMessage::Ping => {
      // Respond with pong
      let pong = json!({
        "type": "pong",
        "timestamp": Utc::now().timestamp(),
      });

      socket.send(Message::Text(pong.to_string())).await.ok();
    }
    _ => {}
  }
}

Use Cases

Real-time Feed Updates

Show new posts as they’re created:

bus.on('action', (data) => {
  if (data.action_type === 'POST' && data.issuer !== currentUser) {
    showNotification(`New post from ${data.issuer}`);
    prependToFeed(data);
  }
});

Online Status Indicators

Track who’s online:

const onlineUsers = new Set();

bus.on('presence', (data) => {
  if (data.status === 'online') {
    onlineUsers.add(data.user);
  } else {
    onlineUsers.delete(data.user);
  }

  updateOnlineIndicators();
});

Typing Indicators in Chat

Show when someone is typing:

const typingUsers = new Map();

bus.on('typing', (data) => {
  typingUsers.set(data.user, Date.now());
  showTypingIndicator(data.user, data.conversation_id);

  // Clear after 5 seconds
  setTimeout(() => {
    if (Date.now() - typingUsers.get(data.user) >= 5000) {
      hideTypingIndicator(data.user);
      typingUsers.delete(data.user);
    }
  }, 5000);
});

// Send typing indicator when user types
messageInput.addEventListener('input', () => {
  bus.sendTyping(recipientId, conversationId);
});

Message Notifications

Instant message delivery:

bus.on('message', (data) => {
  playNotificationSound();
  showNotification(`New message from ${data.sender}`, data.preview);

  if (currentConversation === data.sender) {
    fetchAndDisplayMessage(data.message_id);
  } else {
    incrementUnreadCount(data.sender);
  }
});

Performance Considerations

Connection Limits

Per server instance:

  • Default: 10,000 concurrent connections
  • Configurable via MAX_WS_CONNECTIONS

Per user:

  • Default: 5 concurrent connections (multiple tabs/devices)
  • Configurable via MAX_WS_CONNECTIONS_PER_USER

Message Rate Limiting

Client → Server:

  • 100 messages per minute per connection
  • Bursts of 20 messages allowed

Server → Client:

  • No hard limit (trust server)
  • Throttling applied for presence updates (max 1 per second per user)

Reconnection Strategy

Client should implement:

  1. Exponential backoff (start at 1s, max 60s)
  2. Random jitter to prevent thundering herd
  3. Token refresh if expired
  4. Event replay from last known timestamp

Security Considerations

Authentication

  • Token required for all connections
  • Token validated on initial connection
  • Invalid token = immediate disconnect
  • Token expiration checked periodically

Authorization

  • Users only receive events they have permission to see
  • Presence updates only for connections/followers
  • Message notifications only for actual recipients
  • Action notifications respect ABAC visibility

Rate Limiting

  • Per-connection message rate limiting
  • Per-user connection limits
  • Typing indicator throttling
  • Presence update throttling

See Also