Overview
AFFiNE uses a WebSocket-based sync protocol built on Y.js (Yjs) CRDTs to enable real-time collaborative editing. The protocol handles document updates, awareness (cursor positions, selections), and conflict-free merging.
Connection
Establishing WebSocket Connection
import { io } from 'socket.io-client';
const socket = io('wss://app.affine.pro', {
auth: {
token: 'YOUR_AUTH_TOKEN'
},
transports: ['websocket']
});
Client Version Requirements
Clients must be running version ≥0.25.0 to connect. Older versions will be rejected immediately.
const CLIENT_VERSION = '0.26.0';
Sync Protocol Versions
Protocol 0.26 (Current)
Supports batch updates with compression:
- Multiple updates sent as array
- Automatic Y.js update merging on server
- Compressed payloads for efficiency
Protocol 0.25 (Legacy)
Single update per message:
- Individual updates only
- No compression
- Backward compatibility maintained
The server automatically detects client version and uses the appropriate protocol room.
Space Types
enum SpaceType {
Workspace = 'workspace', // Shared collaborative workspaces
Userspace = 'userspace' // Private user data
}
Core Events
Join Space
Join a workspace or userspace to begin syncing:
socket.emit('space:join', {
spaceType: 'workspace',
spaceId: 'workspace-123',
clientVersion: '0.26.0'
}, (response) => {
console.log('Client ID:', response.clientId);
console.log('Success:', response.success);
});
spaceType
'workspace' | 'userspace'
required
Type of space to join
Workspace or userspace identifier
Semantic version of client (e.g., ‘0.26.0’)
Leave Space
socket.emit('space:leave', {
spaceType: 'workspace',
spaceId: 'workspace-123'
});
Document Synchronization
Load Document
Fetch document state with optional state vector for differential sync:
import { encodeStateVectorFromUpdate } from 'yjs';
// Get current local state vector
const localUpdate = Y.encodeStateAsUpdate(ydoc);
const stateVector = encodeStateVectorFromUpdate(localUpdate);
socket.emit('space:load-doc', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId: 'doc-456',
stateVector: Buffer.from(stateVector).toString('base64')
}, (response) => {
// Apply missing updates
const missing = Buffer.from(response.missing, 'base64');
Y.applyUpdate(ydoc, missing);
});
Base64-encoded Y.js update containing data not in client’s state vector
Base64-encoded server’s state vector
Unix timestamp (ms) of last update
Push Document Update
Send local changes to server:
ydoc.on('update', (update: Uint8Array, origin: any) => {
if (origin === 'remote') return; // Don't echo remote updates
socket.emit('space:push-doc-update', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId: 'doc-456',
update: Buffer.from(update).toString('base64')
}, (response) => {
console.log('Update accepted:', response.accepted);
console.log('Server timestamp:', response.timestamp);
});
});
Base64-encoded Y.js update binary
Always true if update was accepted
Server timestamp when update was processed
Broadcast Doc Updates (Protocol 0.26)
Receive batched updates from other clients:
socket.on('space:broadcast-doc-updates', (message) => {
const { spaceId, docId, updates, timestamp, editor, compressed } = message;
for (const updateBase64 of updates) {
const update = Buffer.from(updateBase64, 'base64');
Y.applyUpdate(ydoc, update, 'remote');
}
if (compressed) {
console.log('Received compressed update');
}
});
‘workspace’ or ‘userspace’
Array of base64-encoded Y.js updates (may be merged if compressed)
User ID of the editor (optional)
true if multiple updates were merged using mergeUpdatesInApplyWay
Broadcast Doc Update (Protocol 0.25)
Legacy single-update broadcast:
socket.on('space:broadcast-doc-update', (message) => {
const { docId, update, timestamp, editor } = message;
const updateBinary = Buffer.from(update, 'base64');
Y.applyUpdate(ydoc, updateBinary, 'remote');
});
Delete Document
socket.emit('space:delete-doc', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId: 'doc-456'
});
Load Document Timestamps
Get timestamps for all documents (useful for detecting changes):
socket.emit('space:load-doc-timestamps', {
spaceType: 'workspace',
spaceId: 'workspace-123',
timestamp: Date.now() - 3600000 // Only docs updated in last hour
}, (response) => {
// response.data = { 'doc-1': 1234567890, 'doc-2': 1234567891 }
for (const [docId, timestamp] of Object.entries(response.data)) {
console.log(`${docId}: ${new Date(timestamp)}`);
}
});
Awareness (Presence)
Awareness provides real-time presence information like cursors, selections, and user status.
Join Awareness
socket.emit('space:join-awareness', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId: 'doc-456',
clientVersion: '0.26.0'
});
Leave Awareness
socket.emit('space:leave-awareness', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId: 'doc-456'
});
Update Awareness
Broadcast your cursor position, selection, or custom state:
import { Awareness } from 'y-protocols/awareness';
const awareness = new Awareness(ydoc);
awareness.setLocalState({
user: {
name: 'Alice',
color: '#ff0000'
},
cursor: {
x: 100,
y: 200
},
selection: {
anchor: 42,
head: 58
}
});
awareness.on('update', ({ added, updated, removed }) => {
const changedClients = [...added, ...updated, ...removed];
const update = encodeAwarenessUpdate(awareness, changedClients);
socket.emit('space:update-awareness', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId: 'doc-456',
awarenessUpdate: Buffer.from(update).toString('base64')
});
});
Receive Awareness Updates
socket.on('space:broadcast-awareness-update', (message) => {
const { docId, awarenessUpdate } = message;
const update = Buffer.from(awarenessUpdate, 'base64');
applyAwarenessUpdate(awareness, update, 'remote');
});
Load All Awareness States
Request current awareness from all connected clients:
socket.emit('space:load-awarenesses', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId: 'doc-456'
});
// Other clients will receive 'space:collect-awareness' event
socket.on('space:collect-awareness', ({ docId }) => {
// Respond by broadcasting your current awareness state
const update = encodeAwarenessUpdate(awareness, [awareness.clientID]);
socket.emit('space:update-awareness', {
spaceType: 'workspace',
spaceId: 'workspace-123',
docId,
awarenessUpdate: Buffer.from(update).toString('base64')
});
});
Sync Architecture
Sync Adapters
WorkspaceSyncAdapter: Handles shared workspace documents with access control
class WorkspaceSyncAdapter {
async assertAccessible(spaceId, userId, action) {
await accessController.user(userId).workspace(spaceId).assert(action);
}
async push(spaceId, docId, updates, editorId) {
// Check if doc is blocked
const meta = await models.doc.getMeta(spaceId, docId);
if (meta?.blocked) {
throw new DocUpdateBlocked({ spaceId, docId });
}
return await storage.pushDocUpdates(spaceId, docId, updates, editorId);
}
}
UserspaceSyncAdapter: Handles private user documents
class UserspaceSyncAdapter {
async assertAccessible(spaceId, userId, action) {
if (spaceId !== userId) {
throw new SpaceAccessDenied({ spaceId });
}
}
}
Sync Rooms
Clients are organized into Socket.IO rooms:
{spaceId}:sync - General space membership
{spaceId}:sync-025 - Protocol 0.25 clients
{spaceId}:sync-026 - Protocol 0.26 clients
{spaceId}:{docId}:awareness - Document-specific presence
Update Compression
The server automatically compresses multiple updates before broadcast:
// Multiple small updates from clients
const updates = [
update1, // 100 bytes
update2, // 50 bytes
update3 // 75 bytes
];
// Server merges using native binding
const merged = mergeUpdatesInApplyWay(updates); // ~180 bytes
// Broadcast single merged update
socket.emit('space:broadcast-doc-updates', {
updates: [Buffer.from(merged).toString('base64')],
compressed: true
});
Update compression reduces bandwidth usage by up to 60% in high-collaboration scenarios.
Error Handling
Common Errors
socket.on('error', (error) => {
switch (error.code) {
case 'NOT_IN_SPACE':
console.error('Must join space before accessing docs');
break;
case 'DOC_NOT_FOUND':
console.error('Document does not exist');
break;
case 'DOC_UPDATE_BLOCKED':
console.error('Document is locked or blocked');
break;
case 'SPACE_ACCESS_DENIED':
console.error('Insufficient permissions');
break;
}
});
Disconnection Handling
socket.on('disconnect', (reason) => {
if (reason === 'io server disconnect') {
// Server terminated connection
socket.connect();
}
// Otherwise, client will auto-reconnect
});
socket.on('reconnect', (attemptNumber) => {
console.log(`Reconnected after ${attemptNumber} attempts`);
// Rejoin spaces
socket.emit('space:join', { ... });
// Resync documents
socket.emit('space:load-doc', { ... });
});
Metrics and Monitoring
The sync gateway tracks key metrics:
// Active connections
metrics.socketio.gauge('connections').record(connectionCount);
// Update broadcasts
metrics.socketio.counter('doc_updates_broadcast').add(updateCount, {
mode: compressed ? 'compressed' : 'batch'
});
// Compression efficiency
metrics.socketio.counter('doc_updates_compressed').add(1);
Active User Tracking
The server tracks active users per minute:
// Server-side tracking
await models.workspaceAnalytics.upsertSyncActiveUsersMinute(
new Date(),
uniqueActiveUsers
);
Best Practices
- Always send state vector with
load-doc to minimize data transfer
- Debounce awareness updates to avoid flooding the network
- Handle reconnection gracefully by rejoining and resyncing
- Use document timestamps to detect out-of-sync state
- Implement exponential backoff for retry logic
- Clean up awareness on disconnect to remove stale presence
- Use protocol 0.26 for better performance with compression
Example: Full Sync Implementation
import * as Y from 'yjs';
import { Awareness, encodeAwarenessUpdate, applyAwarenessUpdate } from 'y-protocols/awareness';
import { io } from 'socket.io-client';
class AFFiNESync {
private socket: Socket;
private ydoc: Y.Doc;
private awareness: Awareness;
constructor(workspaceId: string, docId: string) {
this.ydoc = new Y.Doc();
this.awareness = new Awareness(this.ydoc);
this.socket = io('wss://app.affine.pro', {
auth: { token: 'YOUR_TOKEN' }
});
this.setupSync(workspaceId, docId);
}
private async setupSync(workspaceId: string, docId: string) {
// Join space
await new Promise<void>((resolve) => {
this.socket.emit('space:join', {
spaceType: 'workspace',
spaceId: workspaceId,
clientVersion: '0.26.0'
}, () => resolve());
});
// Load initial state
const stateVector = Y.encodeStateVector(this.ydoc);
await new Promise<void>((resolve) => {
this.socket.emit('space:load-doc', {
spaceType: 'workspace',
spaceId: workspaceId,
docId,
stateVector: Buffer.from(stateVector).toString('base64')
}, (response) => {
const missing = Buffer.from(response.missing, 'base64');
Y.applyUpdate(this.ydoc, missing, 'remote');
resolve();
});
});
// Subscribe to updates
this.ydoc.on('update', (update, origin) => {
if (origin === 'remote') return;
this.socket.emit('space:push-doc-update', {
spaceType: 'workspace',
spaceId: workspaceId,
docId,
update: Buffer.from(update).toString('base64')
});
});
this.socket.on('space:broadcast-doc-updates', (message) => {
if (message.docId !== docId) return;
for (const updateBase64 of message.updates) {
const update = Buffer.from(updateBase64, 'base64');
Y.applyUpdate(this.ydoc, update, 'remote');
}
});
// Setup awareness
await new Promise<void>((resolve) => {
this.socket.emit('space:join-awareness', {
spaceType: 'workspace',
spaceId: workspaceId,
docId,
clientVersion: '0.26.0'
}, () => resolve());
});
this.awareness.on('update', ({ added, updated, removed }) => {
const changedClients = [...added, ...updated, ...removed];
const update = encodeAwarenessUpdate(this.awareness, changedClients);
this.socket.emit('space:update-awareness', {
spaceType: 'workspace',
spaceId: workspaceId,
docId,
awarenessUpdate: Buffer.from(update).toString('base64')
});
});
this.socket.on('space:broadcast-awareness-update', (message) => {
if (message.docId !== docId) return;
const update = Buffer.from(message.awarenessUpdate, 'base64');
applyAwarenessUpdate(this.awareness, update, 'remote');
});
}
destroy() {
this.socket.disconnect();
this.awareness.destroy();
this.ydoc.destroy();
}
}