Skip to main content

Overview

AFFiNE uses a WebSocket-based sync protocol built on Y.js (Yjs) CRDTs to enable real-time collaborative editing. The protocol handles document updates, awareness (cursor positions, selections), and conflict-free merging.

Connection

Establishing WebSocket Connection

import { io } from 'socket.io-client';

const socket = io('wss://app.affine.pro', {
  auth: {
    token: 'YOUR_AUTH_TOKEN'
  },
  transports: ['websocket']
});

Client Version Requirements

Clients must be running version ≥0.25.0 to connect. Older versions will be rejected immediately.
const CLIENT_VERSION = '0.26.0';

Sync Protocol Versions

Protocol 0.26 (Current)

Supports batch updates with compression:
  • Multiple updates sent as array
  • Automatic Y.js update merging on server
  • Compressed payloads for efficiency

Protocol 0.25 (Legacy)

Single update per message:
  • Individual updates only
  • No compression
  • Backward compatibility maintained
The server automatically detects client version and uses the appropriate protocol room.

Space Types

enum SpaceType {
  Workspace = 'workspace', // Shared collaborative workspaces
  Userspace = 'userspace'  // Private user data
}

Core Events

Join Space

Join a workspace or userspace to begin syncing:
socket.emit('space:join', {
  spaceType: 'workspace',
  spaceId: 'workspace-123',
  clientVersion: '0.26.0'
}, (response) => {
  console.log('Client ID:', response.clientId);
  console.log('Success:', response.success);
});
spaceType
'workspace' | 'userspace'
required
Type of space to join
spaceId
string
required
Workspace or userspace identifier
clientVersion
string
required
Semantic version of client (e.g., ‘0.26.0’)

Leave Space

socket.emit('space:leave', {
  spaceType: 'workspace',
  spaceId: 'workspace-123'
});

Document Synchronization

Load Document

Fetch document state with optional state vector for differential sync:
import { encodeStateVectorFromUpdate } from 'yjs';

// Get current local state vector
const localUpdate = Y.encodeStateAsUpdate(ydoc);
const stateVector = encodeStateVectorFromUpdate(localUpdate);

socket.emit('space:load-doc', {
  spaceType: 'workspace',
  spaceId: 'workspace-123',
  docId: 'doc-456',
  stateVector: Buffer.from(stateVector).toString('base64')
}, (response) => {
  // Apply missing updates
  const missing = Buffer.from(response.missing, 'base64');
  Y.applyUpdate(ydoc, missing);
});
missing
string
Base64-encoded Y.js update containing data not in client’s state vector
state
string
Base64-encoded server’s state vector
timestamp
number
Unix timestamp (ms) of last update

Push Document Update

Send local changes to server:
ydoc.on('update', (update: Uint8Array, origin: any) => {
  if (origin === 'remote') return; // Don't echo remote updates
  
  socket.emit('space:push-doc-update', {
    spaceType: 'workspace',
    spaceId: 'workspace-123',
    docId: 'doc-456',
    update: Buffer.from(update).toString('base64')
  }, (response) => {
    console.log('Update accepted:', response.accepted);
    console.log('Server timestamp:', response.timestamp);
  });
});
update
string
required
Base64-encoded Y.js update binary
accepted
boolean
Always true if update was accepted
timestamp
number
Server timestamp when update was processed

Broadcast Doc Updates (Protocol 0.26)

Receive batched updates from other clients:
socket.on('space:broadcast-doc-updates', (message) => {
  const { spaceId, docId, updates, timestamp, editor, compressed } = message;
  
  for (const updateBase64 of updates) {
    const update = Buffer.from(updateBase64, 'base64');
    Y.applyUpdate(ydoc, update, 'remote');
  }
  
  if (compressed) {
    console.log('Received compressed update');
  }
});
spaceType
string
‘workspace’ or ‘userspace’
spaceId
string
Space identifier
docId
string
Document identifier
updates
string[]
Array of base64-encoded Y.js updates (may be merged if compressed)
timestamp
number
Server timestamp
editor
string
User ID of the editor (optional)
compressed
boolean
true if multiple updates were merged using mergeUpdatesInApplyWay

Broadcast Doc Update (Protocol 0.25)

Legacy single-update broadcast:
socket.on('space:broadcast-doc-update', (message) => {
  const { docId, update, timestamp, editor } = message;
  const updateBinary = Buffer.from(update, 'base64');
  Y.applyUpdate(ydoc, updateBinary, 'remote');
});

Delete Document

socket.emit('space:delete-doc', {
  spaceType: 'workspace',
  spaceId: 'workspace-123',
  docId: 'doc-456'
});

Load Document Timestamps

Get timestamps for all documents (useful for detecting changes):
socket.emit('space:load-doc-timestamps', {
  spaceType: 'workspace',
  spaceId: 'workspace-123',
  timestamp: Date.now() - 3600000 // Only docs updated in last hour
}, (response) => {
  // response.data = { 'doc-1': 1234567890, 'doc-2': 1234567891 }
  for (const [docId, timestamp] of Object.entries(response.data)) {
    console.log(`${docId}: ${new Date(timestamp)}`);
  }
});

Awareness (Presence)

Awareness provides real-time presence information like cursors, selections, and user status.

Join Awareness

socket.emit('space:join-awareness', {
  spaceType: 'workspace',
  spaceId: 'workspace-123',
  docId: 'doc-456',
  clientVersion: '0.26.0'
});

Leave Awareness

socket.emit('space:leave-awareness', {
  spaceType: 'workspace',
  spaceId: 'workspace-123',
  docId: 'doc-456'
});

Update Awareness

Broadcast your cursor position, selection, or custom state:
import { Awareness } from 'y-protocols/awareness';

const awareness = new Awareness(ydoc);

awareness.setLocalState({
  user: {
    name: 'Alice',
    color: '#ff0000'
  },
  cursor: {
    x: 100,
    y: 200
  },
  selection: {
    anchor: 42,
    head: 58
  }
});

awareness.on('update', ({ added, updated, removed }) => {
  const changedClients = [...added, ...updated, ...removed];
  const update = encodeAwarenessUpdate(awareness, changedClients);
  
  socket.emit('space:update-awareness', {
    spaceType: 'workspace',
    spaceId: 'workspace-123',
    docId: 'doc-456',
    awarenessUpdate: Buffer.from(update).toString('base64')
  });
});

Receive Awareness Updates

socket.on('space:broadcast-awareness-update', (message) => {
  const { docId, awarenessUpdate } = message;
  const update = Buffer.from(awarenessUpdate, 'base64');
  
  applyAwarenessUpdate(awareness, update, 'remote');
});

Load All Awareness States

Request current awareness from all connected clients:
socket.emit('space:load-awarenesses', {
  spaceType: 'workspace',
  spaceId: 'workspace-123',
  docId: 'doc-456'
});

// Other clients will receive 'space:collect-awareness' event
socket.on('space:collect-awareness', ({ docId }) => {
  // Respond by broadcasting your current awareness state
  const update = encodeAwarenessUpdate(awareness, [awareness.clientID]);
  socket.emit('space:update-awareness', {
    spaceType: 'workspace',
    spaceId: 'workspace-123',
    docId,
    awarenessUpdate: Buffer.from(update).toString('base64')
  });
});

Sync Architecture

Sync Adapters

WorkspaceSyncAdapter: Handles shared workspace documents with access control
class WorkspaceSyncAdapter {
  async assertAccessible(spaceId, userId, action) {
    await accessController.user(userId).workspace(spaceId).assert(action);
  }
  
  async push(spaceId, docId, updates, editorId) {
    // Check if doc is blocked
    const meta = await models.doc.getMeta(spaceId, docId);
    if (meta?.blocked) {
      throw new DocUpdateBlocked({ spaceId, docId });
    }
    return await storage.pushDocUpdates(spaceId, docId, updates, editorId);
  }
}
UserspaceSyncAdapter: Handles private user documents
class UserspaceSyncAdapter {
  async assertAccessible(spaceId, userId, action) {
    if (spaceId !== userId) {
      throw new SpaceAccessDenied({ spaceId });
    }
  }
}

Sync Rooms

Clients are organized into Socket.IO rooms:
  • {spaceId}:sync - General space membership
  • {spaceId}:sync-025 - Protocol 0.25 clients
  • {spaceId}:sync-026 - Protocol 0.26 clients
  • {spaceId}:{docId}:awareness - Document-specific presence

Update Compression

The server automatically compresses multiple updates before broadcast:
// Multiple small updates from clients
const updates = [
  update1, // 100 bytes
  update2, // 50 bytes
  update3  // 75 bytes
];

// Server merges using native binding
const merged = mergeUpdatesInApplyWay(updates); // ~180 bytes

// Broadcast single merged update
socket.emit('space:broadcast-doc-updates', {
  updates: [Buffer.from(merged).toString('base64')],
  compressed: true
});
Update compression reduces bandwidth usage by up to 60% in high-collaboration scenarios.

Error Handling

Common Errors

socket.on('error', (error) => {
  switch (error.code) {
    case 'NOT_IN_SPACE':
      console.error('Must join space before accessing docs');
      break;
    case 'DOC_NOT_FOUND':
      console.error('Document does not exist');
      break;
    case 'DOC_UPDATE_BLOCKED':
      console.error('Document is locked or blocked');
      break;
    case 'SPACE_ACCESS_DENIED':
      console.error('Insufficient permissions');
      break;
  }
});

Disconnection Handling

socket.on('disconnect', (reason) => {
  if (reason === 'io server disconnect') {
    // Server terminated connection
    socket.connect();
  }
  // Otherwise, client will auto-reconnect
});

socket.on('reconnect', (attemptNumber) => {
  console.log(`Reconnected after ${attemptNumber} attempts`);
  
  // Rejoin spaces
  socket.emit('space:join', { ... });
  
  // Resync documents
  socket.emit('space:load-doc', { ... });
});

Metrics and Monitoring

The sync gateway tracks key metrics:
// Active connections
metrics.socketio.gauge('connections').record(connectionCount);

// Update broadcasts
metrics.socketio.counter('doc_updates_broadcast').add(updateCount, {
  mode: compressed ? 'compressed' : 'batch'
});

// Compression efficiency
metrics.socketio.counter('doc_updates_compressed').add(1);

Active User Tracking

The server tracks active users per minute:
// Server-side tracking
await models.workspaceAnalytics.upsertSyncActiveUsersMinute(
  new Date(),
  uniqueActiveUsers
);

Best Practices

  1. Always send state vector with load-doc to minimize data transfer
  2. Debounce awareness updates to avoid flooding the network
  3. Handle reconnection gracefully by rejoining and resyncing
  4. Use document timestamps to detect out-of-sync state
  5. Implement exponential backoff for retry logic
  6. Clean up awareness on disconnect to remove stale presence
  7. Use protocol 0.26 for better performance with compression

Example: Full Sync Implementation

import * as Y from 'yjs';
import { Awareness, encodeAwarenessUpdate, applyAwarenessUpdate } from 'y-protocols/awareness';
import { io } from 'socket.io-client';

class AFFiNESync {
  private socket: Socket;
  private ydoc: Y.Doc;
  private awareness: Awareness;
  
  constructor(workspaceId: string, docId: string) {
    this.ydoc = new Y.Doc();
    this.awareness = new Awareness(this.ydoc);
    this.socket = io('wss://app.affine.pro', {
      auth: { token: 'YOUR_TOKEN' }
    });
    
    this.setupSync(workspaceId, docId);
  }
  
  private async setupSync(workspaceId: string, docId: string) {
    // Join space
    await new Promise<void>((resolve) => {
      this.socket.emit('space:join', {
        spaceType: 'workspace',
        spaceId: workspaceId,
        clientVersion: '0.26.0'
      }, () => resolve());
    });
    
    // Load initial state
    const stateVector = Y.encodeStateVector(this.ydoc);
    await new Promise<void>((resolve) => {
      this.socket.emit('space:load-doc', {
        spaceType: 'workspace',
        spaceId: workspaceId,
        docId,
        stateVector: Buffer.from(stateVector).toString('base64')
      }, (response) => {
        const missing = Buffer.from(response.missing, 'base64');
        Y.applyUpdate(this.ydoc, missing, 'remote');
        resolve();
      });
    });
    
    // Subscribe to updates
    this.ydoc.on('update', (update, origin) => {
      if (origin === 'remote') return;
      
      this.socket.emit('space:push-doc-update', {
        spaceType: 'workspace',
        spaceId: workspaceId,
        docId,
        update: Buffer.from(update).toString('base64')
      });
    });
    
    this.socket.on('space:broadcast-doc-updates', (message) => {
      if (message.docId !== docId) return;
      
      for (const updateBase64 of message.updates) {
        const update = Buffer.from(updateBase64, 'base64');
        Y.applyUpdate(this.ydoc, update, 'remote');
      }
    });
    
    // Setup awareness
    await new Promise<void>((resolve) => {
      this.socket.emit('space:join-awareness', {
        spaceType: 'workspace',
        spaceId: workspaceId,
        docId,
        clientVersion: '0.26.0'
      }, () => resolve());
    });
    
    this.awareness.on('update', ({ added, updated, removed }) => {
      const changedClients = [...added, ...updated, ...removed];
      const update = encodeAwarenessUpdate(this.awareness, changedClients);
      
      this.socket.emit('space:update-awareness', {
        spaceType: 'workspace',
        spaceId: workspaceId,
        docId,
        awarenessUpdate: Buffer.from(update).toString('base64')
      });
    });
    
    this.socket.on('space:broadcast-awareness-update', (message) => {
      if (message.docId !== docId) return;
      
      const update = Buffer.from(message.awarenessUpdate, 'base64');
      applyAwarenessUpdate(this.awareness, update, 'remote');
    });
  }
  
  destroy() {
    this.socket.disconnect();
    this.awareness.destroy();
    this.ydoc.destroy();
  }
}