WebSocket Bus
Cloudillo’s WebSocket Bus provides real-time notifications and presence tracking for connected clients. This is separate from the RTDB and CRDT WebSocket protocols, serving as a general-purpose notification system.
Overview
The WebSocket Bus enables:
- ✅ Real-time notifications - Action events, messages, updates
- ✅ Presence tracking - Online/offline status of users
- ✅ Typing indicators - Show when users are typing
- ✅ Action broadcasts - New posts, comments, reactions
- ✅ Live updates - Content changes without polling
WebSocket Endpoint
Connection
URL: wss://cl-o.{domain}/ws/bus
Authentication: Required via query parameter or header
// Connect with access token
const token = 'your_access_token';
const ws = new WebSocket(`wss://cl-o.example.com/ws/bus?token=${token}`);
// Or via Authorization header (if supported by client)
const ws = new WebSocket('wss://cl-o.example.com/ws/bus');
// Set Authorization header before connecting
Connection Flow
Client Server
| |
|--- GET /ws/bus?token=... ---->|
| |--- Validate token
| |--- Create session
| |
|<-- 101 Switching Protocols ---|
| |
|<===== WebSocket Open ========>|
| |
|<-- welcome message ----------|
| |
|--- subscribe message -------->|
| |
|<===== Event Stream ==========>|Message Protocol
All messages use JSON format (not binary like RTDB/CRDT).
Message Structure
{
"type": "message_type",
"data": { ... },
"timestamp": 1738483200
}Client → Server Messages
Subscribe to Events
Subscribe to specific event types:
{
"type": "subscribe",
"events": ["action", "presence", "typing"]
}Event Types:
action- New actions (posts, comments, reactions)presence- User online/offline statustyping- Typing indicatorsnotification- General notificationsmessage- Direct messages
Unsubscribe from Events
{
"type": "unsubscribe",
"events": ["typing"]
}Send Presence Update
Update your online status:
{
"type": "presence",
"status": "online",
"activity": "browsing"
}Status values:
online- Active and availableaway- Inactive but connectedbusy- Do not disturboffline- Explicitly offline
Send Typing Indicator
Notify others you’re typing:
{
"type": "typing",
"recipient": "bob.example.com",
"conversation_id": "conv_123"
}Auto-timeout: Typing indicators automatically expire after 5 seconds.
Heartbeat/Ping
Keep connection alive:
{
"type": "ping"
}Server responds with:
{
"type": "pong",
"timestamp": 1738483200
}Server → Client Messages
Welcome Message
Sent immediately after connection:
{
"type": "welcome",
"session_id": "sess_abc123",
"user": "alice.example.com",
"timestamp": 1738483200
}Action Notification
Notify about new actions:
{
"type": "action",
"action_type": "POST",
"action_id": "a1~xyz789...",
"issuer": "bob.example.com",
"content": "Check out this cool feature!",
"timestamp": 1738483200
}Action types: POST, REPOST, CMNT, REACT, MSG, CONN, FLLW, etc.
Presence Update
Notify about user status changes:
{
"type": "presence",
"user": "bob.example.com",
"status": "online",
"activity": "browsing",
"timestamp": 1738483200
}Sent when:
- User connects/disconnects
- User explicitly updates status
- User goes idle (after 5 minutes of inactivity)
Typing Indicator
Notify that someone is typing:
{
"type": "typing",
"user": "bob.example.com",
"conversation_id": "conv_123",
"timestamp": 1738483200
}Auto-clear: Client should clear typing indicator after 5 seconds if no update received.
Direct Message Notification
Notify about new direct messages:
{
"type": "message",
"message_id": "a1~msg789...",
"sender": "bob.example.com",
"preview": "Hey, are you available for a call?",
"timestamp": 1738483200
}Preview: First ~100 characters of message content.
General Notification
System notifications:
{
"type": "notification",
"notification_type": "connection_request",
"from": "charlie.example.com",
"message": "Charlie wants to connect with you",
"action_url": "/connections/pending",
"timestamp": 1738483200
}Error Message
Error responses:
{
"type": "error",
"code": "E-WS-AUTH",
"message": "Invalid or expired token",
"timestamp": 1738483200
}Common error codes:
E-WS-AUTH- Authentication failedE-WS-INVALID- Invalid message formatE-WS-RATE- Rate limit exceededE-WS-PERMISSION- Permission denied
Client Implementation
JavaScript/Browser
class CloudilloBus {
constructor(domain, token) {
this.domain = domain;
this.token = token;
this.ws = null;
this.handlers = new Map();
}
connect() {
this.ws = new WebSocket(`wss://cl-o.${this.domain}/ws/bus?token=${this.token}`);
this.ws.onopen = () => {
console.log('Connected to WebSocket Bus');
// Subscribe to events
this.send({
type: 'subscribe',
events: ['action', 'presence', 'typing', 'message']
});
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
this.handleMessage(message);
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
this.ws.onclose = () => {
console.log('WebSocket closed');
// Reconnect after 5 seconds
setTimeout(() => this.connect(), 5000);
};
// Send ping every 30 seconds
setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'ping' });
}
}, 30000);
}
send(message) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
on(eventType, handler) {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType).push(handler);
}
handleMessage(message) {
const handlers = this.handlers.get(message.type) || [];
handlers.forEach(handler => handler(message.data || message));
}
updatePresence(status, activity) {
this.send({
type: 'presence',
status,
activity
});
}
sendTyping(recipient, conversationId) {
this.send({
type: 'typing',
recipient,
conversation_id: conversationId
});
}
disconnect() {
if (this.ws) {
this.ws.close();
}
}
}
// Usage
const bus = new CloudilloBus('example.com', accessToken);
bus.on('action', (data) => {
console.log('New action:', data);
// Update UI with new post/comment/etc
});
bus.on('presence', (data) => {
console.log('Presence update:', data);
// Update online status indicators
});
bus.on('typing', (data) => {
console.log('User typing:', data);
// Show typing indicator
});
bus.on('message', (data) => {
console.log('New message:', data);
// Show notification, update message list
});
bus.connect();React Hook
import { useEffect, useState, useRef } from 'react';
export function useCloudilloBus(domain, token) {
const [connected, setConnected] = useState(false);
const [events, setEvents] = useState([]);
const busRef = useRef(null);
useEffect(() => {
if (!token) return;
const bus = new CloudilloBus(domain, token);
busRef.current = bus;
bus.on('welcome', () => setConnected(true));
bus.on('action', (data) => {
setEvents(prev => [...prev, { type: 'action', data }]);
});
bus.on('message', (data) => {
setEvents(prev => [...prev, { type: 'message', data }]);
});
bus.connect();
return () => {
bus.disconnect();
setConnected(false);
};
}, [domain, token]);
return {
connected,
events,
bus: busRef.current
};
}
// Usage in component
function MyComponent() {
const { connected, events, bus } = useCloudilloBus('example.com', accessToken);
const handleTyping = () => {
bus?.sendTyping('bob.example.com', 'conv_123');
};
return (
<div>
<p>Status: {connected ? 'Connected' : 'Disconnected'}</p>
<ul>
{events.map((event, i) => (
<li key={i}>{event.type}: {JSON.stringify(event.data)}</li>
))}
</ul>
</div>
);
}Server Implementation
Connection Handler
use axum::{
extract::{Query, State, WebSocketUpgrade},
response::Response,
};
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct BusQuery {
token: String,
}
pub async fn ws_bus_handler(
ws: WebSocketUpgrade,
Query(query): Query<BusQuery>,
State(app): State<Arc<App>>,
) -> Response {
// Validate access token
let auth = match validate_access_token(&app, &query.token).await {
Ok(auth) => auth,
Err(_) => return (StatusCode::UNAUTHORIZED).into_response(),
};
// Upgrade to WebSocket
ws.on_upgrade(move |socket| handle_bus_socket(socket, auth, app))
}
async fn handle_bus_socket(
mut socket: WebSocket,
auth: Auth,
app: Arc<App>,
) {
// Generate session ID
let session_id = generate_session_id();
// Send welcome message
let welcome = json!({
"type": "welcome",
"session_id": session_id,
"user": auth.id_tag,
"timestamp": Utc::now().timestamp(),
});
if socket.send(Message::Text(welcome.to_string())).await.is_err() {
return;
}
// Register session in bus
app.bus.add_session(session_id.clone(), auth.id_tag.clone()).await;
// Handle incoming messages
while let Some(msg) = socket.recv().await {
match msg {
Ok(Message::Text(text)) => {
if let Ok(message) = serde_json::from_str::<BusMessage>(&text) {
handle_bus_message(message, &auth, &app, &mut socket).await;
}
}
Ok(Message::Close(_)) => break,
_ => {}
}
}
// Cleanup session
app.bus.remove_session(&session_id).await;
}Message Handling
#[derive(Deserialize)]
#[serde(tag = "type")]
enum BusMessage {
Subscribe { events: Vec<String> },
Unsubscribe { events: Vec<String> },
Presence { status: String, activity: Option<String> },
Typing { recipient: String, conversation_id: String },
Ping,
}
async fn handle_bus_message(
message: BusMessage,
auth: &Auth,
app: &App,
socket: &mut WebSocket,
) {
match message {
BusMessage::Subscribe { events } => {
// Subscribe to events
app.bus.subscribe(&auth.id_tag, events).await;
}
BusMessage::Presence { status, activity } => {
// Broadcast presence update
let update = json!({
"type": "presence",
"user": auth.id_tag,
"status": status,
"activity": activity,
"timestamp": Utc::now().timestamp(),
});
app.bus.broadcast(&auth.id_tag, update).await;
}
BusMessage::Typing { recipient, conversation_id } => {
// Send typing indicator to recipient
let typing = json!({
"type": "typing",
"user": auth.id_tag,
"conversation_id": conversation_id,
"timestamp": Utc::now().timestamp(),
});
app.bus.send_to(&recipient, typing).await;
}
BusMessage::Ping => {
// Respond with pong
let pong = json!({
"type": "pong",
"timestamp": Utc::now().timestamp(),
});
socket.send(Message::Text(pong.to_string())).await.ok();
}
_ => {}
}
}Use Cases
Real-time Feed Updates
Show new posts as they’re created:
bus.on('action', (data) => {
if (data.action_type === 'POST' && data.issuer !== currentUser) {
showNotification(`New post from ${data.issuer}`);
prependToFeed(data);
}
});Online Status Indicators
Track who’s online:
const onlineUsers = new Set();
bus.on('presence', (data) => {
if (data.status === 'online') {
onlineUsers.add(data.user);
} else {
onlineUsers.delete(data.user);
}
updateOnlineIndicators();
});Typing Indicators in Chat
Show when someone is typing:
const typingUsers = new Map();
bus.on('typing', (data) => {
typingUsers.set(data.user, Date.now());
showTypingIndicator(data.user, data.conversation_id);
// Clear after 5 seconds
setTimeout(() => {
if (Date.now() - typingUsers.get(data.user) >= 5000) {
hideTypingIndicator(data.user);
typingUsers.delete(data.user);
}
}, 5000);
});
// Send typing indicator when user types
messageInput.addEventListener('input', () => {
bus.sendTyping(recipientId, conversationId);
});Message Notifications
Instant message delivery:
bus.on('message', (data) => {
playNotificationSound();
showNotification(`New message from ${data.sender}`, data.preview);
if (currentConversation === data.sender) {
fetchAndDisplayMessage(data.message_id);
} else {
incrementUnreadCount(data.sender);
}
});Performance Considerations
Connection Limits
Per server instance:
- Default: 10,000 concurrent connections
- Configurable via
MAX_WS_CONNECTIONS
Per user:
- Default: 5 concurrent connections (multiple tabs/devices)
- Configurable via
MAX_WS_CONNECTIONS_PER_USER
Message Rate Limiting
Client → Server:
- 100 messages per minute per connection
- Bursts of 20 messages allowed
Server → Client:
- No hard limit (trust server)
- Throttling applied for presence updates (max 1 per second per user)
Reconnection Strategy
Client should implement:
- Exponential backoff (start at 1s, max 60s)
- Random jitter to prevent thundering herd
- Token refresh if expired
- Event replay from last known timestamp
Security Considerations
Authentication
- Token required for all connections
- Token validated on initial connection
- Invalid token = immediate disconnect
- Token expiration checked periodically
Authorization
- Users only receive events they have permission to see
- Presence updates only for connections/followers
- Message notifications only for actual recipients
- Action notifications respect ABAC visibility
Rate Limiting
- Per-connection message rate limiting
- Per-user connection limits
- Typing indicator throttling
- Presence update throttling
See Also
- RTDB WebSocket Protocol - Real-time database subscriptions
- CRDT WebSocket Protocol - Collaborative editing
- System Architecture - Overall architecture
- [Actions](/architecture/actions-federation/actions - Action token types that trigger notifications