WebSocket Bus
Cloudillo’s WebSocket Bus provides real-time notifications and presence tracking for connected clients. This is separate from the RTDB and CRDT WebSocket protocols, serving as a general-purpose notification system.
WebSocket Endpoints Overview
Cloudillo provides three WebSocket endpoints for different real-time use cases:
| Endpoint | Purpose | Protocol | Documentation |
|---|---|---|---|
/ws/bus |
Notifications, presence, typing indicators | JSON messages | This page |
/ws/rtdb/{file_id} |
Real-time database subscriptions | Binary protocol | RTDB Protocol |
/ws/crdt/{doc_id} |
Collaborative document editing (Yjs) | Binary sync protocol | CRDT Protocol |
When to Use Each Endpoint
- Bus (
/ws/bus): General notifications, presence, typing—use when you need to know about events happening across the platform (new posts, messages, connection requests). - RTDB (
/ws/rtdb/{file_id}): Real-time app state—use when your application needs live-updating data (todo lists, dashboards, game state). - CRDT (
/ws/crdt/{doc_id}): Collaborative editing—use when multiple users edit the same document simultaneously (text documents, whiteboards, spreadsheets).
Overview
The WebSocket Bus enables:
- ✅ Real-time notifications - Action events, messages, updates
- ✅ Presence tracking - Online/offline status of users
- ✅ Typing indicators - Show when users are typing
- ✅ Action broadcasts - New posts, comments, reactions
- ✅ Live updates - Content changes without polling
WebSocket Endpoint
Connection
URL: wss://cl-o.{domain}/ws/bus
Authentication: Required via query parameter or header
// Connect with access token
const token = 'your_access_token';
const ws = new WebSocket(`wss://cl-o.example.com/ws/bus?token=${token}`);
// Or via Authorization header (if supported by client)
const ws = new WebSocket('wss://cl-o.example.com/ws/bus');
// Set Authorization header before connecting
Connection Flow
Client Server
| |
|--- GET /ws/bus?token=... ---->|
| |--- Validate token
| |--- Create session
| |
|<-- 101 Switching Protocols ---|
| |
|<===== WebSocket Open ========>|
| |
|<-- welcome message ----------|
| |
|--- subscribe message -------->|
| |
|<===== Event Stream ==========>|Message Protocol
All messages use JSON format (not binary like RTDB/CRDT).
Message Structure
{
"type": "message_type",
"data": { ... },
"timestamp": 1738483200
}Client → Server Messages
Subscribe to Events
Subscribe to specific event types:
{
"type": "subscribe",
"events": ["action", "presence", "typing"]
}Event Types:
action- New actions (posts, comments, reactions)presence- User online/offline statustyping- Typing indicatorsnotification- General notificationsmessage- Direct messages
Unsubscribe from Events
{
"type": "unsubscribe",
"events": ["typing"]
}Send Presence Update
Update your online status:
{
"type": "presence",
"status": "online",
"activity": "browsing"
}Status values:
online- Active and availableaway- Inactive but connectedbusy- Do not disturboffline- Explicitly offline
Send Typing Indicator
Notify others you’re typing:
{
"type": "typing",
"recipient": "bob.example.com",
"conversation_id": "conv_123"
}Auto-timeout: Typing indicators automatically expire after 5 seconds.
Heartbeat/Ping
Keep connection alive:
{
"type": "ping"
}Server responds with:
{
"type": "pong",
"timestamp": 1738483200
}Server → Client Messages
Welcome Message
Sent immediately after connection:
{
"type": "welcome",
"session_id": "sess_abc123",
"user": "alice.example.com",
"timestamp": 1738483200
}Action Notification
Notify about new actions:
{
"type": "action",
"action_type": "POST",
"action_id": "a1~xyz789...",
"issuer": "bob.example.com",
"content": "Check out this cool feature!",
"timestamp": 1738483200
}Action types: POST, REPOST, CMNT, REACT, MSG, CONN, FLLW, etc.
Presence Update
Notify about user status changes:
{
"type": "presence",
"user": "bob.example.com",
"status": "online",
"activity": "browsing",
"timestamp": 1738483200
}Sent when:
- User connects/disconnects
- User explicitly updates status
- User goes idle (after 5 minutes of inactivity)
Typing Indicator
Notify that someone is typing:
{
"type": "typing",
"user": "bob.example.com",
"conversation_id": "conv_123",
"timestamp": 1738483200
}Auto-clear: Client should clear typing indicator after 5 seconds if no update received.
Direct Message Notification
Notify about new direct messages:
{
"type": "message",
"message_id": "a1~msg789...",
"sender": "bob.example.com",
"preview": "Hey, are you available for a call?",
"timestamp": 1738483200
}Preview: First ~100 characters of message content.
General Notification
System notifications:
{
"type": "notification",
"notification_type": "connection_request",
"from": "charlie.example.com",
"message": "Charlie wants to connect with you",
"action_url": "/connections/pending",
"timestamp": 1738483200
}Error Message
Error responses:
{
"type": "error",
"code": "E-WS-AUTH",
"message": "Invalid or expired token",
"timestamp": 1738483200
}Common error codes:
E-WS-AUTH- Authentication failedE-WS-INVALID- Invalid message formatE-WS-RATE- Rate limit exceededE-WS-PERMISSION- Permission denied
Client Implementation
JavaScript/Browser
class CloudilloBus {
constructor(domain, token) {
this.domain = domain;
this.token = token;
this.ws = null;
this.handlers = new Map();
}
connect() {
this.ws = new WebSocket(`wss://cl-o.${this.domain}/ws/bus?token=${this.token}`);
this.ws.onopen = () => {
console.log('Connected to WebSocket Bus');
// Subscribe to events
this.send({
type: 'subscribe',
events: ['action', 'presence', 'typing', 'message']
});
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.ws.onclose = () => {
console.log('WebSocket closed');
// Reconnect after 5 seconds
setTimeout(() => this.connect(), 5000);
};
// Send ping every 30 seconds
setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'ping' });
}
}, 30000);
}
send(message) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
on(eventType, handler) {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType).push(handler);
}
handleMessage(message) {
const handlers = this.handlers.get(message.type) || [];
handlers.forEach(handler => handler(message.data || message));
}
updatePresence(status, activity) {
this.send({
type: 'presence',
status,
activity
});
}
sendTyping(recipient, conversationId) {
this.send({
type: 'typing',
recipient,
conversation_id: conversationId
});
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// Usage
const bus = new CloudilloBus('example.com', accessToken);
bus.on('action', (data) => {
console.log('New action:', data);
// Update UI with new post/comment/etc
});
bus.on('presence', (data) => {
console.log('Presence update:', data);
// Update online status indicators
});
bus.on('typing', (data) => {
console.log('User typing:', data);
// Show typing indicator
});
bus.on('message', (data) => {
console.log('New message:', data);
// Show notification, update message list
});
bus.connect();React Hook
import { useEffect, useState, useRef } from 'react';
export function useCloudilloBus(domain, token) {
const [connected, setConnected] = useState(false);
const [events, setEvents] = useState([]);
const busRef = useRef(null);
useEffect(() => {
if (!token) return;
const bus = new CloudilloBus(domain, token);
busRef.current = bus;
bus.on('welcome', () => setConnected(true));
bus.on('action', (data) => {
setEvents(prev => [...prev, { type: 'action', data }]);
});
bus.on('message', (data) => {
setEvents(prev => [...prev, { type: 'message', data }]);
});
bus.connect();
return () => {
bus.disconnect();
setConnected(false);
};
}, [domain, token]);
return {
connected,
events,
bus: busRef.current
};
}
// Usage in component
function MyComponent() {
const { connected, events, bus } = useCloudilloBus('example.com', accessToken);
const handleTyping = () => {
bus?.sendTyping('bob.example.com', 'conv_123');
};
return (
<div>
<p>Status: {connected ? 'Connected' : 'Disconnected'}</p>
<ul>
{events.map((event, i) => (
<li key={i}>{event.type}: {JSON.stringify(event.data)}</li>
))}
</ul>
</div>
);
}Server Implementation
Connection Handler
use axum::{
extract::{Query, State, WebSocketUpgrade},
response::Response,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct BusQuery {
token: String,
}
pub async fn ws_bus_handler(
ws: WebSocketUpgrade,
Query(query): Query<BusQuery>,
State(app): State<Arc<App>>,
) -> Response {
// Validate access token
let auth = match validate_access_token(&app, &query.token).await {
Ok(auth) => auth,
Err(_) => return (StatusCode::UNAUTHORIZED).into_response(),
};
// Upgrade to WebSocket
ws.on_upgrade(move |socket| handle_bus_socket(socket, auth, app))
}
async fn handle_bus_socket(
mut socket: WebSocket,
auth: Auth,
app: Arc<App>,
) {
// Generate session ID
let session_id = generate_session_id();
// Send welcome message
let welcome = json!({
"type": "welcome",
"session_id": session_id,
"user": auth.id_tag,
"timestamp": Utc::now().timestamp(),
});
if socket.send(Message::Text(welcome.to_string())).await.is_err() {
return;
}
// Register session in bus
app.bus.add_session(session_id.clone(), auth.id_tag.clone()).await;
// Handle incoming messages
while let Some(msg) = socket.recv().await {
match msg {
Ok(Message::Text(text)) => {
if let Ok(message) = serde_json::from_str::<BusMessage>(&text) {
handle_bus_message(message, &auth, &app, &mut socket).await;
}
}
Ok(Message::Close(_)) => break,
_ => {}
}
}
// Cleanup session
app.bus.remove_session(&session_id).await;
}Message Handling
#[derive(Deserialize)]
#[serde(tag = "type")]
enum BusMessage {
Subscribe { events: Vec<String> },
Unsubscribe { events: Vec<String> },
Presence { status: String, activity: Option<String> },
Typing { recipient: String, conversation_id: String },
Ping,
}
async fn handle_bus_message(
message: BusMessage,
auth: &Auth,
app: &App,
socket: &mut WebSocket,
) {
match message {
BusMessage::Subscribe { events } => {
// Subscribe to events
app.bus.subscribe(&auth.id_tag, events).await;
}
BusMessage::Presence { status, activity } => {
// Broadcast presence update
let update = json!({
"type": "presence",
"user": auth.id_tag,
"status": status,
"activity": activity,
"timestamp": Utc::now().timestamp(),
});
app.bus.broadcast(&auth.id_tag, update).await;
}
BusMessage::Typing { recipient, conversation_id } => {
// Send typing indicator to recipient
let typing = json!({
"type": "typing",
"user": auth.id_tag,
"conversation_id": conversation_id,
"timestamp": Utc::now().timestamp(),
});
app.bus.send_to(&recipient, typing).await;
}
BusMessage::Ping => {
// Respond with pong
let pong = json!({
"type": "pong",
"timestamp": Utc::now().timestamp(),
});
socket.send(Message::Text(pong.to_string())).await.ok();
}
_ => {}
}
}Use Cases
Real-time Feed Updates
Show new posts as they’re created:
bus.on('action', (data) => {
if (data.action_type === 'POST' && data.issuer !== currentUser) {
showNotification(`New post from ${data.issuer}`);
prependToFeed(data);
}
});Online Status Indicators
Track who’s online:
const onlineUsers = new Set();
bus.on('presence', (data) => {
if (data.status === 'online') {
onlineUsers.add(data.user);
} else {
onlineUsers.delete(data.user);
}
updateOnlineIndicators();
});Typing Indicators in Chat
Show when someone is typing:
const typingUsers = new Map();
bus.on('typing', (data) => {
typingUsers.set(data.user, Date.now());
showTypingIndicator(data.user, data.conversation_id);
// Clear after 5 seconds
setTimeout(() => {
if (Date.now() - typingUsers.get(data.user) >= 5000) {
hideTypingIndicator(data.user);
typingUsers.delete(data.user);
}
}, 5000);
});
// Send typing indicator when user types
messageInput.addEventListener('input', () => {
bus.sendTyping(recipientId, conversationId);
});Message Notifications
Instant message delivery:
bus.on('message', (data) => {
playNotificationSound();
showNotification(`New message from ${data.sender}`, data.preview);
if (currentConversation === data.sender) {
fetchAndDisplayMessage(data.message_id);
} else {
incrementUnreadCount(data.sender);
}
});Real-time Action Forwarding
The WebSocket Bus integrates with the action processing system to provide real-time updates when actions are created or received.
Outbound Forwarding
When a user creates an action locally, it’s forwarded to their connected WebSocket clients:
Outbound Action Forwarding:
1. User creates action via POST /api/actions
2. Action is processed and stored
3. ActionCreatorTask completes
4. Forward to WebSocket Bus:
a. Find all WebSocket sessions for user's tenant
b. Send action notification to each session
5. Client receives real-time update
Audience Targeting:
- POST: Broadcast to creator's sessions
- MSG: Forward to all conversation participant sessions
- CMNT/REACT: Forward to parent action owner's sessionsOutbound Message Format:
{
"type": "action",
"action_type": "POST",
"action_id": "a1~xyz789...",
"issuer": "alice.example.com",
"content": "New post content",
"timestamp": 1738483200,
"source": "local"
}Inbound Forwarding
When a federated action is received from a remote instance:
Inbound Action Forwarding:
1. Remote action arrives at POST /api/inbox
2. ActionVerifierTask verifies and stores action
3. After successful processing:
a. Identify target tenant from action audience
b. Find all WebSocket sessions for that tenant
c. Broadcast action to all sessions
4. All connected clients see the new action
Broadcast Scope:
- All clients for the target tenant receive notification
- Client-side filtering determines what to display
- Enables real-time feed updates without pollingInbound Message Format:
{
"type": "action",
"action_type": "POST",
"action_id": "a1~abc123...",
"issuer": "bob.remote.com",
"content": "Federated post",
"timestamp": 1738483200,
"source": "federated"
}Forwarding Flow Diagram
Local Action Creation:
┌─────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ Client │───►│ API │───►│ Task │───►│ WebSocket │
│ (POST) │ │ Handler │ │ Scheduler │ │ Bus │
└─────────┘ └──────────┘ └─────────────┘ └──────┬───────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Session 1│ │ Session 2│ │ Session 3│
│ (Tab 1) │ │ (Tab 2) │ │ (Mobile) │
└──────────┘ └──────────┘ └──────────┘
Federated Action Reception:
┌─────────────┐ ┌──────────┐ ┌─────────────┐ ┌──────────────┐
│ Remote │───►│ /api/ │───►│ Verifier │───►│ WebSocket │
│ Instance │ │ inbox │ │ Task │ │ Bus │
└─────────────┘ └──────────┘ └─────────────┘ └──────┬───────┘
│
Broadcast to all
tenant sessionsPush Notification Integration
For users who are offline (no active WebSocket connection), the system can send push notifications for important actions.
Push Notification Decision Tree
Push Notification Decision:
1. Action is processed successfully
│
2. Check if forwarded to WebSocket
├─ Yes → User is online, skip push notification
└─ No → User is offline, continue...
│
3. Check action type eligibility
├─ MSG (Message) → Eligible
├─ CONN (Connection request) → Eligible
├─ FSHR (File share) → Eligible
├─ CMNT (Comment on user's content) → Eligible
└─ Other → Not eligible, skip
│
4. Check user notification preferences
├─ Notifications disabled → Skip
└─ Notifications enabled → Continue...
│
5. Check notification throttling
├─ Too many recent notifications → Skip (prevent spam)
└─ Within limits → Continue...
│
6. Schedule push notification taskPush Notification Types
| Action Type | Push Notification | Content |
|---|---|---|
| MSG | Direct message | “{sender} sent you a message” |
| CONN | Connection request | “{sender} wants to connect” |
| FSHR | File share | “{sender} shared a file with you” |
| CMNT | Comment | “{sender} commented on your post” |
| REACT | Reaction | Grouped, not individual pushes |
| FLLW | Follow | “{sender} started following you” |
Push Task Scheduling
schedule_push_notification(action, recipient):
# Check eligibility
if NOT is_push_eligible(action.type):
return
# Check if user is online
if websocket_bus.has_active_session(recipient):
return # User will see via WebSocket
# Check user preferences
prefs = load_notification_preferences(recipient)
if NOT prefs.push_enabled:
return
if NOT prefs.allows_type(action.type):
return
# Check throttling
recent_count = count_recent_pushes(recipient, window: 1 hour)
if recent_count > MAX_PUSHES_PER_HOUR:
return
# Schedule the push
task = PushNotificationTask {
recipient: recipient,
action_type: action.type,
sender: action.issuer,
preview: generate_preview(action),
action_url: generate_deep_link(action),
}
scheduler.task(task).schedule()Client Registration
Clients register for push notifications during WebSocket connection:
// During WebSocket setup
bus.send({
type: 'register_push',
token: pushNotificationToken, // From FCM/APNs
platform: 'android' // or 'ios', 'web'
});
// Server stores token for offline delivery
Performance Considerations
Connection Limits
Per server instance:
- Default: 10,000 concurrent connections
- Configurable via
MAX_WS_CONNECTIONS
Per user:
- Default: 5 concurrent connections (multiple tabs/devices)
- Configurable via
MAX_WS_CONNECTIONS_PER_USER
Message Rate Limiting
Client → Server:
- 100 messages per minute per connection
- Bursts of 20 messages allowed
Server → Client:
- No hard limit (trust server)
- Throttling applied for presence updates (max 1 per second per user)
Reconnection Strategy
Client should implement:
- Exponential backoff (start at 1s, max 60s)
- Random jitter to prevent thundering herd
- Token refresh if expired
- Event replay from last known timestamp
Security Considerations
Authentication
- Token required for all connections
- Token validated on initial connection
- Invalid token = immediate disconnect
- Token expiration checked periodically
Authorization
- Users only receive events they have permission to see
- Presence updates only for connections/followers
- Message notifications only for actual recipients
- Action notifications respect ABAC visibility
Rate Limiting
- Per-connection message rate limiting
- Per-user connection limits
- Typing indicator throttling
- Presence update throttling
See Also
- RTDB WebSocket Protocol - Real-time database subscriptions
- CRDT WebSocket Protocol - Collaborative editing
- System Architecture - Overall architecture
- Actions - Action token types that trigger notifications