Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions packages/sync/src/providers/http-polling/polling-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,15 +447,19 @@ function poll(): void {
roomState.processAwarenessUpdate( room.awareness );

// If there is another collaborator on the primary entity,
// resume the queue for the next poll and increase polling
// frequency. We only check the primary room to avoid false
// positives from shared collection rooms (e.g. taxonomy/category).
// resume all room queues for the next poll and increase
// polling frequency. We only check the primary room to
// avoid false positives from shared collection rooms
// (e.g. taxonomy/category), but resume all queues so
// collection rooms (e.g. root/comment) can also sync.
if (
roomState.isPrimaryRoom &&
Object.keys( room.awareness ).length > 1
) {
hasCollaborators = true;
roomState.updateQueue.resume();
roomStates.forEach( ( state ) => {
state.updateQueue.resume();
} );
}

// Process each incoming update and collect any responses.
Expand Down
273 changes: 262 additions & 11 deletions packages/sync/src/providers/http-polling/test/polling-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ function createMockDoc( clientID = 1 ) {
return { clientID, on: jest.fn(), off: jest.fn() };
}

// Helper to extract the onDocUpdate callback registered via doc.on('updateV2', ...).
function getOnDocUpdate( doc: ReturnType< typeof createMockDoc > ) {
const call = doc.on.mock.calls.find(
( args: unknown[] ) => args[ 0 ] === 'updateV2'
);
if ( ! call ) {
throw new Error( 'onDocUpdate not registered' );
}
return call[ 1 ] as ( update: Uint8Array, origin: unknown ) => void;
}

function createMockAwareness() {
return {
clientID: 1,
Expand Down Expand Up @@ -141,17 +152,6 @@ describe( 'polling-manager', () => {
} );

describe( 'document size limit', () => {
// Helper to extract the onDocUpdate callback registered via doc.on('updateV2', ...).
function getOnDocUpdate( doc: ReturnType< typeof createMockDoc > ) {
const call = doc.on.mock.calls.find(
( args: unknown[] ) => args[ 0 ] === 'updateV2'
);
if ( ! call ) {
throw new Error( 'onDocUpdate not registered' );
}
return call[ 1 ] as ( update: Uint8Array, origin: unknown ) => void;
}

it( 'emits document-size-limit-exceeded error when an update exceeds the size limit', async () => {
mockPostSyncUpdate.mockResolvedValue( syncResponse );

Expand Down Expand Up @@ -552,6 +552,257 @@ describe( 'polling-manager', () => {
} );
} );

describe( 'collaborator queue resumption', () => {
it( 'resumes non-primary room queues when collaborators are detected on primary room', async () => {
// First poll: primary room has collaborators, collection room has none.
mockPostSyncUpdate.mockResolvedValue( {
rooms: [
{
room: 'primary-room',
end_cursor: 1,
awareness: {
1: { collaboratorInfo: { id: 100 } },
2: { collaboratorInfo: { id: 200 } },
},
updates: [],
},
{
room: 'collection-room',
end_cursor: 1,
awareness: {},
updates: [],
},
],
} );

// Register primary room first (becomes isPrimaryRoom).
pollingManager.registerRoom( {
room: 'primary-room',
doc: createMockDoc( 1 ),
awareness: createMockAwareness(),
log: jest.fn(),
onStatusChange: jest.fn(),
onSync: jest.fn(),
} );

pollingManager.registerRoom( {
room: 'collection-room',
doc: createMockDoc( 2 ),
awareness: createMockAwareness(),
log: jest.fn(),
onStatusChange: jest.fn(),
onSync: jest.fn(),
} );

// First poll: detects collaborators on primary room, resumes all queues.
await jest.advanceTimersByTimeAsync( 0 );

// Second poll: collection room queue should now be unpaused,
// so its initial sync_step1 update should be included.
mockPostSyncUpdate.mockResolvedValue( {
rooms: [
{
room: 'primary-room',
end_cursor: 2,
awareness: {
1: { collaboratorInfo: { id: 100 } },
2: { collaboratorInfo: { id: 200 } },
},
updates: [],
},
{
room: 'collection-room',
end_cursor: 2,
awareness: {},
updates: [],
},
],
} );

await jest.advanceTimersByTimeAsync( 1000 );

// The second call should include non-empty updates for the collection room.
const secondCallPayload = mockPostSyncUpdate.mock.calls[ 1 ][ 0 ];
const collectionRoom = secondCallPayload.rooms.find(
( r: { room: string } ) => r.room === 'collection-room'
);
expect( collectionRoom!.updates.length ).toBeGreaterThan( 0 );
} );

it( 'does not resume non-primary room queues when no collaborators are detected', async () => {
// Only 1 client (self) — no collaborators.
mockPostSyncUpdate.mockResolvedValue( {
rooms: [
{
room: 'primary-room',
end_cursor: 1,
awareness: { 1: { collaboratorInfo: { id: 100 } } },
updates: [],
},
{
room: 'collection-room',
end_cursor: 1,
awareness: {},
updates: [],
},
],
} );

pollingManager.registerRoom( {
room: 'primary-room',
doc: createMockDoc( 1 ),
awareness: createMockAwareness(),
log: jest.fn(),
onStatusChange: jest.fn(),
onSync: jest.fn(),
} );

pollingManager.registerRoom( {
room: 'collection-room',
doc: createMockDoc( 2 ),
awareness: createMockAwareness(),
log: jest.fn(),
onStatusChange: jest.fn(),
onSync: jest.fn(),
} );

// First poll: no collaborators.
await jest.advanceTimersByTimeAsync( 0 );

// Second poll: collection room queue should still be paused.
await jest.advanceTimersByTimeAsync( 4000 );

const secondCallPayload = mockPostSyncUpdate.mock.calls[ 1 ][ 0 ];
const collectionRoom = secondCallPayload.rooms.find(
( r: { room: string } ) => r.room === 'collection-room'
);
expect( collectionRoom!.updates ).toEqual( [] );
} );

it( 'sends accumulated collection room updates after collaborator detection', async () => {
// First poll: no collaborators.
mockPostSyncUpdate.mockResolvedValue( {
rooms: [
{
room: 'primary-room',
end_cursor: 1,
awareness: { 1: { collaboratorInfo: { id: 100 } } },
updates: [],
},
{
room: 'collection-room',
end_cursor: 1,
awareness: {},
updates: [],
},
],
} );

const collectionDoc = createMockDoc( 2 );

pollingManager.registerRoom( {
room: 'primary-room',
doc: createMockDoc( 1 ),
awareness: createMockAwareness(),
log: jest.fn(),
onStatusChange: jest.fn(),
onSync: jest.fn(),
} );

pollingManager.registerRoom( {
room: 'collection-room',
doc: collectionDoc,
awareness: createMockAwareness(),
log: jest.fn(),
onStatusChange: jest.fn(),
onSync: jest.fn(),
} );

// First poll: no collaborators, queues stay paused.
await jest.advanceTimersByTimeAsync( 0 );

// Simulate a local doc update on the collection room (e.g., a note was saved).
const onDocUpdate = getOnDocUpdate( collectionDoc );
onDocUpdate( new Uint8Array( [ 1, 2, 3 ] ), 'local-origin' );

// Second poll: still no collaborators, collection room updates should be empty.
mockPostSyncUpdate.mockResolvedValue( {
rooms: [
{
room: 'primary-room',
end_cursor: 2,
awareness: { 1: { collaboratorInfo: { id: 100 } } },
updates: [],
},
{
room: 'collection-room',
end_cursor: 2,
awareness: {},
updates: [],
},
],
} );
await jest.advanceTimersByTimeAsync( 4000 );

const secondCallPayload = mockPostSyncUpdate.mock.calls[ 1 ][ 0 ];
const collectionRoomPoll2 = secondCallPayload.rooms.find(
( r: { room: string } ) => r.room === 'collection-room'
);
expect( collectionRoomPoll2!.updates ).toEqual( [] );

// Third poll: collaborator joins — queues should be resumed.
mockPostSyncUpdate.mockResolvedValue( {
rooms: [
{
room: 'primary-room',
end_cursor: 3,
awareness: {
1: { collaboratorInfo: { id: 100 } },
2: { collaboratorInfo: { id: 200 } },
},
updates: [],
},
{
room: 'collection-room',
end_cursor: 3,
awareness: {},
updates: [],
},
],
} );
await jest.advanceTimersByTimeAsync( 4000 );

// Fourth poll: collection room should now send accumulated updates.
mockPostSyncUpdate.mockResolvedValue( {
rooms: [
{
room: 'primary-room',
end_cursor: 4,
awareness: {
1: { collaboratorInfo: { id: 100 } },
2: { collaboratorInfo: { id: 200 } },
},
updates: [],
},
{
room: 'collection-room',
end_cursor: 4,
awareness: {},
updates: [],
},
],
} );
await jest.advanceTimersByTimeAsync( 1000 );

const fourthCallPayload = mockPostSyncUpdate.mock.calls[ 3 ][ 0 ];
const collectionRoomPoll4 = fourthCallPayload.rooms.find(
( r: { room: string } ) => r.room === 'collection-room'
);
// Should include the initial sync_step1 update + the local update.
expect( collectionRoomPoll4!.updates.length ).toBeGreaterThan( 0 );
} );
} );

describe( 'visibility change', () => {
it( 'does not spawn a duplicate poll when a request is in-flight', () => {
// Keep the first postSyncUpdate pending so we can simulate
Expand Down
Loading
Loading