Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions settings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,18 @@
*/
"loadTest": false,

/*
* Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit
* per recipient when a fan-out has more than one revision to catch up
* (#7756 lever 3b). Reduces engine.io packet count under high pad
* concurrency, especially on the WebSocket transport.
*
* WARNING: enabling this requires all connected clients to understand
* NEW_CHANGES_BATCH. Old clients will silently fail to apply batched
* revisions. Coordinate the rollout.
*/
"newChangesBatch": false,

/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
41 changes: 41 additions & 0 deletions src/node/handler/NewChangesPacker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Wire-format decision for NEW_CHANGES vs NEW_CHANGES_BATCH (#7756 lever 3b).
//
// Lives in its own tiny module rather than inside PadMessageHandler so the
// pure decision can be unit-tested without standing up the full pad / DB /
// socket.io stack. PadMessageHandler.updatePadClients calls this function
// once per recipient with the queued revisions for that recipient.

export type NewChangesItem = {
newRev: number;
changeset: string;
apool: unknown;
author: string;
currentTime: number;
timeDelta: number;
};

export type NewChangesEmit =
| {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES'} & NewChangesItem}
| {type: 'COLLABROOM'; data: {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]}};

/**
* Decide what to put on the wire for one recipient.
* - No queued revisions: nothing.
* - Batching disabled, or exactly one rev: emit one NEW_CHANGES per rev
* (legacy behaviour; preserves bytes-on-wire for the steady state).
* - Batching enabled and multiple revs: emit one NEW_CHANGES_BATCH with
* the array of revisions.
*/
export const buildNewChangesEmits = (
pending: NewChangesItem[],
batchEnabled: boolean,
): NewChangesEmit[] => {
if (pending.length === 0) return [];
if (batchEnabled && pending.length > 1) {
return [{type: 'COLLABROOM', data: {type: 'NEW_CHANGES_BATCH', changes: pending}}];
}
return pending.map((change) => ({
type: 'COLLABROOM',
data: {type: 'NEW_CHANGES', ...change},
} as NewChangesEmit));
};
86 changes: 59 additions & 27 deletions src/node/handler/PadMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const hooks = require('../../static/js/pluginfw/hooks');
const stats = require('../stats')
const assert = require('assert').strict;
import {recordChangesetApply, recordSocketEmit} from '../prom-instruments';
import {buildNewChangesEmits, type NewChangesItem} from './NewChangesPacker';
import {RateLimiterMemory} from 'rate-limiter-flexible';
import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest";
import {APool, AText, PadAuthor, PadType} from "../types/PadType";
Expand Down Expand Up @@ -1011,45 +1012,76 @@ exports.updatePadClients = async (pad: PadType) => {
// but benefit of reusing cached revision object is HUGE
const revCache:MapArrayType<any> = {};

// When `settings.newChangesBatch` is true and a recipient is more than one
// revision behind, pack the queued revisions into a single NEW_CHANGES_BATCH
// emit per recipient. The engine.io WebSocket transport sends one frame per
// packet (the polling transport already batches at the HTTP-response layer),
// so reducing the packet count translates directly into fewer system calls
// on the server and fewer onmessage callbacks on the client.
const batchEnabled = settings.newChangesBatch === true;

await Promise.all(roomSockets.map(async (socket) => {
const sessioninfo = sessioninfos[socket.id];
// The user might have disconnected since _getRoomSockets() was called.
if (sessioninfo == null) return;

while (sessioninfo.rev < pad.getHeadRevisionNumber()) {
const r = sessioninfo.rev + 1;
let revision = revCache[r];
if (!revision) {
revision = await pad.getRevision(r);
revCache[r] = revision;
}

const author = revision.meta.author;
const revChangeset = revision.changeset;
const currentTime = revision.meta.timestamp;
// Snapshot the local state so a concurrent updatePadClients() can't make
// us double-emit. We hold the "I'm responsible for revs (startRev,
// headRev]" claim by reading sessioninfo.rev once and overwriting it
// before any await. A second invocation arriving mid-loop will see the
// bumped rev and skip those revisions; if our emit fails the catch
// below rolls sessioninfo.rev back so they aren't lost.
const startRev = sessioninfo.rev;
const headRev = pad.getHeadRevisionNumber();
if (startRev >= headRev) return;
const startTime = sessioninfo.time;
// Claim the range immediately so concurrent runs skip it.
sessioninfo.rev = headRev;

Comment on lines +1028 to +1040
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Lost revisions on race 🐞 Bug ≡ Correctness

updatePadClients() sets sessioninfo.rev=headRev before awaiting revision loads and emitting, so a
concurrent updatePadClients() can advance sessioninfo.rev past headRev and prevent the rollback,
permanently skipping some revisions for that socket. The client drops out-of-sequence
NEW_CHANGES/NEW_CHANGES_BATCH (warn+return) and will not recover because the server believes the
client is already at the newer rev.
Agent Prompt
## Issue description
`updatePadClients()` pre-claims `sessioninfo.rev` (`sessioninfo.rev = headRev`) before any `await` and before `socket.emit()`. If another `updatePadClients()` runs concurrently for the same socket (from code paths not serialized by `padChannels`), it can advance `sessioninfo.rev` beyond `headRev` while the first call is still in-flight. If the first call then throws (revision fetch or emit), the rollback guard `if (sessioninfo.rev === headRev)` prevents restoring `startRev`, leaving a permanent gap: the server will never resend the skipped revisions, and the client rejects subsequent revisions as out-of-sequence.

## Issue Context
Some call sites invoke `updatePadClients()` outside the per-pad `padChannels` queue (and at least one does not `await` it), so overlap is possible.

## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1023-1086]

### Suggested fix approach
- Introduce per-socket serialization for fan-out (similar to the existing `Channels` pattern), e.g. a `socketChannels.enqueue(socket.id, async () => { ... })` wrapper around the per-socket catch-up logic.
- With serialization in place, remove the early `sessioninfo.rev = headRev` claim.
- Only update `sessioninfo.rev`/`sessioninfo.time` after successful emits; on failure, leave them unchanged so the next run retries from the last confirmed sent revision.
- Ensure ordering guarantees: for a given socket, never emit revision N+1 before N has been successfully emitted (and ideally, treat the emit as “committed” only after the send loop completes without throwing).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Comment on lines +1034 to +1040
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Premature rev claim 🐞 Bug ≡ Correctness

updatePadClients() sets sessioninfo.rev=headRev before emitting any NEW_CHANGES/NEW_CHANGES_BATCH,
so other server logic can believe a client is caught up when it is not. This can cause ACCEPT_COMMIT
(or subsequent NEW_CHANGES) to arrive with newRev > clientRev+1, which the client treats as a bad
revision and stops processing messages.
Agent Prompt
## Issue description
`updatePadClients()` preemptively assigns `sessioninfo.rev = headRev` to “claim” a range before doing any `await` or `socket.emit()`. This changes `sessioninfo.rev` semantics from “last revision actually delivered to this client” to “range I plan to deliver”, breaking existing ordering guarantees (notably the `handleUserChanges()` ACCEPT_COMMIT ordering assertion) and can lead to clients rejecting messages.

## Issue Context
The client enforces strict revision sequencing for both NEW_CHANGES and ACCEPT_COMMIT. The server currently relies on `thisSession.rev` to ensure all prior updates have been delivered before sending ACCEPT_COMMIT.

## Fix Focus Areas
- src/node/handler/PadMessageHandler.ts[1028-1085]

### Suggested fix direction
- Do **not** mutate `sessioninfo.rev` until after the corresponding message(s) have been emitted successfully.
- If you need to prevent concurrent fan-out per socket, introduce a separate per-socket serialization mechanism (e.g., a per-socket promise chain/mutex or `sessioninfo._fanoutInProgress` + queued target rev) instead of reusing `sessioninfo.rev`.
- If partial sending is possible (non-batched path), track `lastSuccessfullyEmittedRev`/`lastSuccessfullyEmittedTime` and roll back to that on error (never to `startRev` after some messages may have been emitted).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

// Collect all queued revisions for this socket.
const pending: Array<{
newRev: number;
changeset: string;
apool: unknown;
author: string;
currentTime: number;
timeDelta: number;
}> = [];

const forWire = prepareForWire(revChangeset, pad.pool);
const msg = {
type: 'COLLABROOM',
data: {
type: 'NEW_CHANGES',
try {
let previousTime = startTime;
for (let r = startRev + 1; r <= headRev; r++) {
let revision = revCache[r];
if (!revision) {
revision = await pad.getRevision(r);
revCache[r] = revision;
}
const author = revision.meta.author;
const revChangeset = revision.changeset;
const currentTime = revision.meta.timestamp;
const forWire = prepareForWire(revChangeset, pad.pool);
pending.push({
newRev: r,
changeset: forWire.translated,
apool: forWire.pool,
author,
currentTime,
timeDelta: currentTime - sessioninfo.time,
},
};
try {
socket.emit('message', msg);
recordSocketEmit('NEW_CHANGES');
} catch (err:any) {
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
return;
timeDelta: currentTime - previousTime,
});
previousTime = currentTime;
}

for (const emit of buildNewChangesEmits(pending, batchEnabled)) {
socket.emit('message', emit);
recordSocketEmit(emit.data.type);
}
sessioninfo.time = currentTime;
sessioninfo.rev = r;
// Only after the wire send succeeds do we commit the new time.
sessioninfo.time = previousTime;
} catch (err: any) {
// Roll back the claim so the next updatePadClients retries these revs.
// Only set rev back if no one else has advanced past us in the meantime.
if (sessioninfo.rev === headRev) sessioninfo.rev = startRev;
messageLogger.error(`Failed to notify user of new revision: ${err.stack || err}`);
}
}));
};
Expand Down
1 change: 1 addition & 0 deletions src/node/prom-instruments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export const padUsersGauge = new client.Gauge({
// state.
const KNOWN_TYPES = new Set([
'NEW_CHANGES',
'NEW_CHANGES_BATCH',
'ACCEPT_COMMIT',
'CHAT_MESSAGE',
'CLIENT_VARS',
Expand Down
16 changes: 16 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ export type SettingsType = {
automaticReconnectionTimeout: number,
loadTest: boolean,
scalingDiveMetrics: boolean,
newChangesBatch: boolean,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
logconfig: any | null,
Expand Down Expand Up @@ -705,6 +706,21 @@ const settings: SettingsType = {
* production deployments aren't paying for instrumentation they don't use.
*/
scalingDiveMetrics: false,
/**
* Pack multiple NEW_CHANGES revisions into a single NEW_CHANGES_BATCH emit
* per recipient when a fan-out has more than one revision to catch up
* (#7756 lever 3b). Reduces engine.io packet count under high pad
* concurrency, especially on the WebSocket transport, which sends one
* frame per packet (the polling transport already batches naturally).
*
* Requires clients to recognise the NEW_CHANGES_BATCH message type. New
* clients are forward-compatible (they handle both NEW_CHANGES and
* NEW_CHANGES_BATCH). Old clients connecting to a server with this enabled
* would fail to apply batched revisions, so coordinate the rollout.
*
* 0/false (default) preserves legacy per-revision emit behaviour.
*/
newChangesBatch: false,
/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
28 changes: 17 additions & 11 deletions src/static/js/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,17 +493,23 @@ const loadBroadcastJS = (socket, sendSocketMsg, fireWhenAllScriptsAreLoaded, Bro
if (obj.type === 'COLLABROOM') {
obj = obj.data;

if (obj.type === 'NEW_CHANGES') {
const changeset = moveOpsToNewPool(
obj.changeset, (new AttribPool()).fromJsonable(obj.apool), padContents.apool);

let changesetBack = inverse(
obj.changeset, padContents.currentLines, padContents.alines, padContents.apool);

changesetBack = moveOpsToNewPool(
changesetBack, (new AttribPool()).fromJsonable(obj.apool), padContents.apool);

loadedNewChangeset(changeset, changesetBack, obj.newRev - 1, obj.timeDelta);
if (obj.type === 'NEW_CHANGES' || obj.type === 'NEW_CHANGES_BATCH') {
// NEW_CHANGES_BATCH (#7756 lever 3b) carries an array of revisions
// in one emit. Each revision has the same shape as the legacy
// single-rev message; apply in order.
const changes = obj.type === 'NEW_CHANGES_BATCH' ? obj.changes : [obj];
for (const change of changes) {
const changeset = moveOpsToNewPool(
change.changeset, (new AttribPool()).fromJsonable(change.apool), padContents.apool);

let changesetBack = inverse(
change.changeset, padContents.currentLines, padContents.alines, padContents.apool);

changesetBack = moveOpsToNewPool(
changesetBack, (new AttribPool()).fromJsonable(change.apool), padContents.apool);

loadedNewChangeset(changeset, changesetBack, change.newRev - 1, change.timeDelta);
}
} else if (obj.type === 'NEW_AUTHORDATA') {
const authorMap = {};
authorMap[obj.author] = obj.data;
Expand Down
23 changes: 15 additions & 8 deletions src/static/js/collab_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
if (wrapper.type !== 'COLLABROOM' && wrapper.type !== 'CUSTOM') return;
const msg = wrapper.data;

if (msg.type === 'NEW_CHANGES') {
if (msg.type === 'NEW_CHANGES' || msg.type === 'NEW_CHANGES_BATCH') {
// NEW_CHANGES_BATCH (added in #7756 lever 3b) carries an array of
// revisions in one emit. Each revision has the same shape as the
// legacy single-rev message, so we normalise to a list and apply in
// order, sharing the same compose-safety await.
const changes = msg.type === 'NEW_CHANGES_BATCH' ? msg.changes : [msg];
serverMessageTaskQueue.enqueue(async () => {
// Avoid updating the DOM while the user is composing a character. Notes about this `await`:
// * `await null;` is equivalent to `await Promise.resolve(null);`, so if the user is not
Expand All @@ -198,14 +203,16 @@ const getCollabClient = (ace2editor, serverVars, initialUserInfo, options, _pad)
// possible, that the chances are so small or the consequences so minor that it's not
// worth addressing).
await editor.getInInternationalComposition();
const {newRev, changeset, author = '', apool} = msg;
if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on NEW_CHANGES: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges");
return;
for (const change of changes) {
const {newRev, changeset, author = '', apool} = change;
if (newRev !== (rev + 1)) {
window.console.warn(`bad message revision on ${msg.type}: ${newRev} not ${rev + 1}`);
// setChannelState("DISCONNECTED", "badmessage_newchanges");
return;
}
rev = newRev;
editor.applyChangesToBase(changeset, author, apool);
}
rev = newRev;
editor.applyChangesToBase(changeset, author, apool);
});
} else if (msg.type === 'ACCEPT_COMMIT') {
serverMessageTaskQueue.enqueue(() => {
Expand Down
14 changes: 14 additions & 0 deletions src/static/js/types/SocketIOMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ export type ClientNewChanges = {
payload?: ClientNewChanges
}

export type NewChangesItem = {
apool: AttributePool,
author: string,
changeset: string,
currentTime: number,
newRev: number,
timeDelta: number,
}

export type ClientNewChangesBatch = {
type: 'NEW_CHANGES_BATCH',
changes: NewChangesItem[],
}

export type ClientAcceptCommitMessage = {
type: 'ACCEPT_COMMIT'
newRev: number
Expand Down
48 changes: 48 additions & 0 deletions src/tests/backend-new/specs/new-changes-batch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Regression test for the NEW_CHANGES_BATCH wire-format decision
// (#7756 lever 3b). Imports the real implementation from
// PadMessageHandler so removing or breaking the production batching
// logic fails this test.

import {describe, it, expect, beforeEach, afterEach} from 'vitest';
import settings from '../../../node/utils/Settings';
import {buildNewChangesEmits, type NewChangesItem} from '../../../node/handler/NewChangesPacker';

const ORIGINAL_FLAG = settings.newChangesBatch;

beforeEach(() => { settings.newChangesBatch = false; });
afterEach(() => { settings.newChangesBatch = ORIGINAL_FLAG; });

const fakePending = (n: number): NewChangesItem[] =>
Array.from({length: n}, (_, i) => ({
newRev: i + 1, changeset: `=${i}`, apool: {}, author: 'a.1',
currentTime: 1_000 * (i + 1), timeDelta: 1_000,
}));

describe('buildNewChangesEmits', () => {
it('flag OFF: one NEW_CHANGES per rev regardless of count', () => {
const emits = buildNewChangesEmits(fakePending(5), false);
expect(emits).toHaveLength(5);
expect(emits.every((e) => e.data.type === 'NEW_CHANGES')).toBe(true);
});

it('flag ON, single rev: still NEW_CHANGES (no batch overhead for the steady state)', () => {
const emits = buildNewChangesEmits(fakePending(1), true);
expect(emits).toHaveLength(1);
expect(emits[0]!.data.type).toBe('NEW_CHANGES');
});

it('flag ON, multiple revs: a single NEW_CHANGES_BATCH carrying all of them', () => {
const emits = buildNewChangesEmits(fakePending(5), true);
expect(emits).toHaveLength(1);
expect(emits[0]!.data.type).toBe('NEW_CHANGES_BATCH');
const batch = emits[0]!.data as {type: 'NEW_CHANGES_BATCH'; changes: NewChangesItem[]};
expect(batch.changes).toHaveLength(5);
expect(batch.changes[0]!.newRev).toBe(1);
expect(batch.changes[4]!.newRev).toBe(5);
});

it('empty pending list emits nothing', () => {
expect(buildNewChangesEmits([], true)).toEqual([]);
expect(buildNewChangesEmits([], false)).toEqual([]);
});
});