Skip to content
4 changes: 1 addition & 3 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import * as Y from 'yjs';
import * as WebSocket from 'ws';
import http, { Server } from 'http';
const ywsUtils = require('y-websocket/bin/utils');
const setupWSConnection = ywsUtils.setupWSConnection;
const docs = ywsUtils.docs as Map<string, Y.Doc & { conns: Map<any, any> }>;
import { docs, setupWSConnection } from './utils';
const port = (process.env['PORT'] || 3000) as number;

const server: Server = http.createServer((request, response) => {
Expand Down
364 changes: 364 additions & 0 deletions backend/src/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,364 @@
import * as Y from 'yjs';
const syncProtocol = require('y-protocols/sync');
const awarenessProtocol = require('y-protocols/awareness');

const encoding = require('lib0/encoding');
const decoding = require('lib0/decoding');
const map = require('lib0/map');

const debounce = require('lodash.debounce');

// const callbackHandler = require('./callback.cjs').callbackHandler;
// const isCallbackSet = require('./callback.cjs').isCallbackSet;

const CALLBACK_DEBOUNCE_WAIT = parseInt(process.env['CALLBACK_DEBOUNCE_WAIT'] || '2000');
const CALLBACK_DEBOUNCE_MAXWAIT = parseInt(process.env['CALLBACK_DEBOUNCE_MAXWAIT'] || '10000');

const wsReadyStateConnecting = 0;
const wsReadyStateOpen = 1;
const wsReadyStateClosing = 2; // eslint-disable-line
const wsReadyStateClosed = 3; // eslint-disable-line

// disable gc when using snapshots!
const gcEnabled = process.env['GC'] !== 'false' && process.env['GC'] !== '0';
const persistenceDir = process.env['YPERSISTENCE'];
/**
* @type {{bindState: function(string,WSSharedDoc):void, writeState:function(string,WSSharedDoc):Promise<any>, provider: any}|null}
*/
let persistence = null;
if (typeof persistenceDir === 'string') {
console.info('Persisting documents to "' + persistenceDir + '"');
// @ts-ignore
const LeveldbPersistence = require('y-leveldb').LeveldbPersistence;
const ldb = new LeveldbPersistence(persistenceDir);
persistence = {
provider: ldb,
bindState: async (docName, ydoc) => {
const persistedYdoc = await ldb.getYDoc(docName);
const newUpdates = Y.encodeStateAsUpdate(ydoc);
ldb.storeUpdate(docName, newUpdates);
Y.applyUpdate(ydoc, Y.encodeStateAsUpdate(persistedYdoc));
ydoc.on('update', (update) => {
ldb.storeUpdate(docName, update);
});
},
writeState: async (_docName, _ydoc) => {}
};
}

/**
* @param {{bindState: function(string,WSSharedDoc):void,
* writeState:function(string,WSSharedDoc):Promise<any>,provider:any}|null} persistence_
*/
exports.setPersistence = (persistence_) => {
persistence = persistence_;
};

/**
* @return {null|{bindState: function(string,WSSharedDoc):void,
* writeState:function(string,WSSharedDoc):Promise<any>}|null} used persistence layer
*/
exports.getPersistence = () => persistence;

/**
* @type {Map<string,WSSharedDoc>}
*/
export const docs = new Map();

const messageSync = 0;
const messageAwareness = 1;
// const messageAuth = 2

/**
* @type {(ydoc: Y.Doc) => Promise<void>}
*/
let contentInitializor = (_ydoc) => Promise.resolve();

/**
* This function is called once every time a Yjs document is created. You can
* use it to pull data from an external source or initialize content.
*
* @param {(ydoc: Y.Doc) => Promise<void>} f
*/
exports.setContentInitializor = (f) => {
contentInitializor = f;
};

class WSSharedDoc extends Y.Doc {
name: any;
conns: any;
awareness: any;
whenInitialized: any;

liveBlocks = new Map<string, LiveBlock>();

/**
* @param {string} name
*/
constructor(name) {
super({ gc: gcEnabled, guid: name });
this.name = name;
/**
* Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed
* @type {Map<Object, Set<number>>}
*/
this.conns = new Map();
/**
* @type {awarenessProtocol.Awareness}
*/
this.awareness = new awarenessProtocol.Awareness(this);
this.awareness.setLocalState(null);
/**
* @param {{ added: Array<number>, updated: Array<number>, removed: Array<number> }} changes
* @param {Object | null} conn Origin is the connection that made the change
*/
const awarenessChangeHandler = ({ added, updated, removed }, conn) => {
const changedClients = added.concat(updated, removed);
if (conn !== null) {
const connControlledIDs = /** @type {Set<number>} */ this.conns.get(conn);
if (connControlledIDs !== undefined) {
added.forEach((clientID) => {
connControlledIDs.add(clientID);
});
removed.forEach((clientID) => {
connControlledIDs.delete(clientID);
});
}
}
// broadcast awareness update
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarString(encoder, this.guid);
encoding.writeVarUint8Array(encoder, awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients));
const buff = encoding.toUint8Array(encoder);
this.conns.forEach((_, c) => {
send(this, c, buff);
});
};
this.awareness.on('update', awarenessChangeHandler);
this.on(
'update',
/** @type {any} */ (update, _origin, doc: WSSharedDoc, _tr) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
encoding.writeVarString(encoder, doc.guid);
syncProtocol.writeUpdate(encoder, update);
const message = encoding.toUint8Array(encoder);
doc.conns.forEach((_, conn) => send(doc, conn, message));
}
);
// if (isCallbackSet) {
// this.on('update', /** @type {any} */ debounce(callbackHandler, CALLBACK_DEBOUNCE_WAIT, { maxWait: CALLBACK_DEBOUNCE_MAXWAIT }));
// }
this.whenInitialized = contentInitializor(this);
}
}

class LiveBlock extends Y.Doc {
rootDoc: WSSharedDoc;
constructor(guid: string, rootDoc: WSSharedDoc) {
super({ gc: gcEnabled, guid, autoLoad: true });
this.on(
'update',
/** @type {any} */ (update, _origin, doc: WSSharedDoc, _tr) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
encoding.writeVarString(encoder, doc.guid);
syncProtocol.writeUpdate(encoder, update);
const message = encoding.toUint8Array(encoder);
rootDoc.conns.forEach((_, conn) => send(this, conn, message));
}
);
this.rootDoc = rootDoc;
}

sync(conn) {
// send sync step 1
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
encoding.writeVarString(encoder, this.guid);
syncProtocol.writeSyncStep1(encoder, this);
send(this, conn, encoding.toUint8Array(encoder));
}
}

exports.WSSharedDoc = WSSharedDoc;

/**
* Gets a Y.Doc by name, whether in memory or on disk
*
* @param {string} docname - the name of the Y.Doc to find or create
* @param {boolean} gc - whether to allow gc on the doc (applies only when created)
* @return {WSSharedDoc}
*/
const getYDoc = (docname, gc = true) => {
return map.setIfUndefined(docs, docname, () => {
const doc = new WSSharedDoc(docname);
doc.gc = gc;
if (persistence !== null) {
persistence.bindState(docname, doc);
}
docs.set(docname, doc);
return doc;
}) as WSSharedDoc;
};

/**
* @param {any} conn
* @param {WSSharedDoc} doc
* @param {Uint8Array} message
*/
const messageListener = (conn, rootDoc: WSSharedDoc, message) => {
try {
const encoder = encoding.createEncoder();
const decoder = decoding.createDecoder(message);
const messageType = decoding.readVarUint(decoder);
const guid = decoding.readVarString(decoder);
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync);
encoding.writeVarString(encoder, guid);
let targetDoc: Y.Doc = null;
if (guid === rootDoc.guid) {
targetDoc = rootDoc;
} else {
let liveBlock = rootDoc.liveBlocks.get(guid);
if (!liveBlock) {
liveBlock = new LiveBlock(guid, rootDoc);
rootDoc.liveBlocks.set(guid, liveBlock);
liveBlock.sync(conn);
}
targetDoc = liveBlock;
}

const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, targetDoc, conn);

if (guid === rootDoc.guid) {
console.log(`root doc(${guid}): ${JSON.stringify(rootDoc.getMap('ai-table').toJSON())}`);
} else {
console.log(`sub doc(${guid})-${syncMessageType}: ${JSON.stringify(targetDoc.getArray('').toJSON())}`);
}

// If the `encoder` only contains the type of reply message and no
// message, there is no need to send the message. When `encoder` only
// contains the type of reply, its length is 1.
if (syncMessageType === syncProtocol.messageYjsSyncStep1) {
send(targetDoc, conn, encoding.toUint8Array(encoder));
}
break;
case messageAwareness: {
awarenessProtocol.applyAwarenessUpdate(rootDoc.awareness, decoding.readVarUint8Array(decoder), conn);
break;
}
}
} catch (err) {
console.error(err);
// @ts-ignore
doc.emit('error', [err]);
}
};

/**
* @param {WSSharedDoc} doc
* @param {any} conn
*/
const closeConn = (doc, conn) => {
if (doc.conns.has(conn)) {
/**
* @type {Set<number>}
*/
// @ts-ignore
const controlledIds = doc.conns.get(conn);
doc.conns.delete(conn);
awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null);
if (doc.conns.size === 0 && persistence !== null) {
// if persisted, we store state and destroy ydocument
persistence.writeState(doc.name, doc).then(() => {
doc.destroy();
});
docs.delete(doc.name);
}
}
conn.close();
};

/**
* @param {WSSharedDoc} doc
* @param {import('ws').WebSocket} conn
* @param {Uint8Array} m
*/
const send = (doc, conn, m) => {
if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) {
closeConn(doc, conn);
}
try {
conn.send(m, {}, (err) => {
err != null && closeConn(doc, conn);
});
} catch (e) {
closeConn(doc, conn);
}
};

const pingTimeout = 30000;

/**
* @param {import('ws').WebSocket} conn
* @param {import('http').IncomingMessage} req
* @param {any} opts
*/
export const setupWSConnection = (conn: any, req, { docName = (req.url || '').slice(1).split('?')[0], gc = true } = {}) => {
conn.binaryType = 'arraybuffer';
// get doc, initialize if it does not exist yet
const doc = getYDoc(docName, gc);
doc.conns.set(conn, new Set());
// listen and reply to events
conn.on('message', /** @param {ArrayBuffer} message */ (message) => messageListener(conn, doc, new Uint8Array(message)));

// Check if connection is still alive
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
if (doc.conns.has(conn)) {
closeConn(doc, conn);
}
clearInterval(pingInterval);
} else if (doc.conns.has(conn)) {
pongReceived = false;
try {
conn.ping();
} catch (e) {
closeConn(doc, conn);
clearInterval(pingInterval);
}
}
}, pingTimeout);
conn.on('close', () => {
closeConn(doc, conn);
clearInterval(pingInterval);
});
conn.on('pong', () => {
pongReceived = true;
});
// put the following in a variables in a block so the interval handlers don't keep in in
// scope
{
// send sync step 1
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
encoding.writeVarString(encoder, doc.guid);
syncProtocol.writeSyncStep1(encoder, doc);
send(doc, conn, encoding.toUint8Array(encoder));
const awarenessStates = doc.awareness.getStates();
if (awarenessStates.size > 0) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageAwareness);
encoding.writeVarString(encoder, doc.guid);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys()))
);
send(doc, conn, encoding.toUint8Array(encoder));
}
}
};
Loading