diff --git a/.changeset/cursor-pagination-loadsubset.md b/.changeset/cursor-pagination-loadsubset.md new file mode 100644 index 000000000..fc4d5bbc7 --- /dev/null +++ b/.changeset/cursor-pagination-loadsubset.md @@ -0,0 +1,23 @@ +--- +"@tanstack/db": patch +"@tanstack/electric-db-collection": patch +"@tanstack/query-db-collection": patch +--- + +Enhanced LoadSubsetOptions with separate cursor expressions and offset for flexible pagination. + +**Changes:** + +- Added `CursorExpressions` type with `whereFrom`, `whereCurrent`, and `lastKey` properties +- `LoadSubsetOptions.where` no longer includes cursor expressions - these are now passed separately via `cursor` +- Added `offset` to `LoadSubsetOptions` for offset-based pagination support +- Electric sync layer now makes two parallel `requestSnapshot` calls when cursor is present: + - One for `whereCurrent` (all ties at boundary, no limit) + - One for `whereFrom` (rows after cursor, with limit) +- Query collection serialization now includes `offset` for query key generation + +**Benefits:** + +- Sync layers can choose between cursor-based or offset-based pagination strategies +- Electric can efficiently handle tie-breaking with two targeted requests +- Better separation of concerns between filtering (`where`) and pagination (`cursor`/`offset`) diff --git a/.changeset/deterministic-collection-ordering.md b/.changeset/deterministic-collection-ordering.md new file mode 100644 index 000000000..2c7ec3f7f --- /dev/null +++ b/.changeset/deterministic-collection-ordering.md @@ -0,0 +1,23 @@ +--- +"@tanstack/db": patch +--- + +Ensure deterministic iteration order for collections and indexes. + +**SortedMap improvements:** + +- Added key-based tie-breaking when values compare as equal, ensuring deterministic ordering +- Optimized to skip value comparison entirely when no comparator is provided (key-only sorting) +- Extracted `compareKeys` utility to `utils/comparison.ts` for reuse + +**BTreeIndex improvements:** + +- Keys within the same indexed value are now returned in deterministic sorted order +- Optimized with fast paths for empty sets and single-key sets to avoid unnecessary allocations + +**CollectionStateManager changes:** + +- Collections now always use `SortedMap` for `syncedData`, ensuring deterministic iteration order +- When no `compare` function is provided, entries are sorted by key only + +This ensures that live queries with `orderBy` and `limit` produce stable, deterministic results even when multiple rows have equal sort values. diff --git a/.changeset/fix-livequery-loading-status-ondemand.md b/.changeset/fix-livequery-loading-status-ondemand.md new file mode 100644 index 000000000..734dcc965 --- /dev/null +++ b/.changeset/fix-livequery-loading-status-ondemand.md @@ -0,0 +1,9 @@ +--- +"@tanstack/db": patch +--- + +fix(db): show loading status during initial loadSubset for on-demand sync + +Fixed an issue where live queries using on-demand sync mode would immediately show `isLoading: false` and `status: 'ready'` even while the initial data was still being fetched. Now the live query correctly shows `isLoading: true` and `status: 'loading'` until the first `loadSubset` completes. + +This ensures that UI components can properly display loading indicators while waiting for the initial data to arrive from on-demand sync sources. Subsequent `loadSubset` calls (e.g., from pagination or windowing) do not affect the ready status. diff --git a/.changeset/multi-column-orderby-loadsubset.md b/.changeset/multi-column-orderby-loadsubset.md new file mode 100644 index 000000000..7a74a4ac8 --- /dev/null +++ b/.changeset/multi-column-orderby-loadsubset.md @@ -0,0 +1,18 @@ +--- +"@tanstack/db": patch +--- + +Enhanced multi-column orderBy support with lazy loading and composite cursor optimization. + +**Changes:** + +- Create index on first orderBy column even for multi-column orderBy queries, enabling lazy loading with first-column ordering +- Pass multi-column orderBy to loadSubset with precise composite cursors (e.g., `or(gt(col1, v1), and(eq(col1, v1), gt(col2, v2)))`) for backend optimization +- Use wide bounds (first column only) for local index operations to ensure no rows are missed +- Use precise composite cursor for sync layer loadSubset to minimize data transfer + +**Benefits:** + +- Multi-column orderBy queries with limit now support lazy loading (previously disabled) +- Sync implementations (like Electric) can optimize queries using composite indexes on the backend +- Local collection uses first-column index efficiently while backend gets precise cursor diff --git a/.changeset/stable-order-tiebreaker.md b/.changeset/stable-order-tiebreaker.md new file mode 100644 index 000000000..ea25f679e --- /dev/null +++ b/.changeset/stable-order-tiebreaker.md @@ -0,0 +1,9 @@ +--- +"@tanstack/db-ivm": patch +--- + +Use row keys for stable tie-breaking in ORDER BY operations instead of hash-based object IDs. + +Previously, when multiple rows had equal ORDER BY values, tie-breaking used `globalObjectIdGenerator.getId(key)` which could produce hash collisions and wasn't stable across page reloads for object references. Now, the row key (which is always `string | number` and unique per row) is used directly for tie-breaking, ensuring deterministic and stable ordering. + +This also simplifies the internal `TaggedValue` type from a 3-tuple `[K, V, Tag]` to a 2-tuple `[K, V]`, removing unnecessary complexity. diff --git a/packages/db-collection-e2e/src/suites/pagination.suite.ts b/packages/db-collection-e2e/src/suites/pagination.suite.ts index c0b7badcb..486b147bd 100644 --- a/packages/db-collection-e2e/src/suites/pagination.suite.ts +++ b/packages/db-collection-e2e/src/suites/pagination.suite.ts @@ -131,6 +131,586 @@ export function createPaginationTestSuite( await query.cleanup() }) + + it(`should sort by multiple fields with chained orderBy`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.isActive, `desc`) + .orderBy(({ user }) => user.age, `asc`) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 1 }) + + const results = Array.from(query.state.values()) + expect(results.length).toBeGreaterThan(0) + + // Verify multi-field sort (isActive desc first, then age asc within each isActive) + for (let i = 1; i < results.length; i++) { + const prev = results[i - 1]! + const curr = results[i]! + + // isActive should be descending (true before false) + if (prev.isActive !== curr.isActive) { + // true (1) should come before false (0) in desc order + expect(prev.isActive ? 1 : 0).toBeGreaterThanOrEqual( + curr.isActive ? 1 : 0 + ) + } else { + // If isActive is same, age should be ascending + expect(prev.age).toBeLessThanOrEqual(curr.age) + } + } + + await query.cleanup() + }) + }) + + describe(`Multi-Column OrderBy with Incremental Loading`, () => { + it(`should correctly paginate with multi-column orderBy and limit`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // First page - get first 10 users sorted by isActive desc, age asc + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.isActive, `desc`) + .orderBy(({ user }) => user.age, `asc`) + .limit(10) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 10 }) + + const results = Array.from(query.state.values()) + expect(results).toHaveLength(10) + + // Verify the ordering is correct + for (let i = 1; i < results.length; i++) { + const prev = results[i - 1]! + const curr = results[i]! + + if (prev.isActive !== curr.isActive) { + expect(prev.isActive ? 1 : 0).toBeGreaterThanOrEqual( + curr.isActive ? 1 : 0 + ) + } else { + expect(prev.age).toBeLessThanOrEqual(curr.age) + } + } + + await query.cleanup() + }) + + it(`should load subsequent pages correctly with multi-column orderBy`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // Get first 15 users + const query1 = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.isActive, `desc`) + .orderBy(({ user }) => user.age, `asc`) + .limit(15) + ) + + await query1.preload() + await waitForQueryData(query1, { minSize: 15 }) + + const firstPage = Array.from(query1.state.values()) + expect(firstPage).toHaveLength(15) + + // Get first 30 users (expanding the window) + const query2 = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.isActive, `desc`) + .orderBy(({ user }) => user.age, `asc`) + .limit(30) + ) + + await query2.preload() + await waitForQueryData(query2, { minSize: 30 }) + + const expandedPage = Array.from(query2.state.values()) + expect(expandedPage).toHaveLength(30) + + // The first 15 items should be the same in both queries + for (let i = 0; i < 15; i++) { + expect(expandedPage[i]!.id).toBe(firstPage[i]!.id) + } + + // Verify ordering is maintained throughout + for (let i = 1; i < expandedPage.length; i++) { + const prev = expandedPage[i - 1]! + const curr = expandedPage[i]! + + if (prev.isActive !== curr.isActive) { + expect(prev.isActive ? 1 : 0).toBeGreaterThanOrEqual( + curr.isActive ? 1 : 0 + ) + } else { + expect(prev.age).toBeLessThanOrEqual(curr.age) + } + } + + await query1.cleanup() + await query2.cleanup() + }) + + it(`should handle multi-column orderBy with duplicate values in first column`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // Sort by isActive (only 2 values: true/false) then by age + // This tests the case where many rows have the same first column value + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.isActive, `desc`) + .orderBy(({ user }) => user.age, `asc`) + .limit(50) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 50 }) + + const results = Array.from(query.state.values()) + expect(results).toHaveLength(50) + + // Count how many active users we got + const activeUsers = results.filter((u) => u.isActive) + const inactiveUsers = results.filter((u) => !u.isActive) + + // Since isActive desc, all active users should come first + // All active users should be at the start + let foundInactive = false + for (const user of results) { + if (!user.isActive) { + foundInactive = true + } else if (foundInactive) { + // Found active after inactive - this is wrong + throw new Error( + `Found active user after inactive user in desc order` + ) + } + } + + // Verify age is ascending within each group + if (activeUsers.length > 1) { + for (let i = 1; i < activeUsers.length; i++) { + expect(activeUsers[i - 1]!.age).toBeLessThanOrEqual( + activeUsers[i]!.age + ) + } + } + + if (inactiveUsers.length > 1) { + for (let i = 1; i < inactiveUsers.length; i++) { + expect(inactiveUsers[i - 1]!.age).toBeLessThanOrEqual( + inactiveUsers[i]!.age + ) + } + } + + await query.cleanup() + }) + + it(`should handle multi-column orderBy with mixed directions`, async () => { + const config = await getConfig() + const postsCollection = config.collections.onDemand.posts + + // Sort by userId ascending, viewCount descending + const query = createLiveQueryCollection((q) => + q + .from({ post: postsCollection }) + .orderBy(({ post }) => post.userId, `asc`) + .orderBy(({ post }) => post.viewCount, `desc`) + .limit(20) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 20 }) + + const results = Array.from(query.state.values()) + expect(results).toHaveLength(20) + + // Verify ordering + for (let i = 1; i < results.length; i++) { + const prev = results[i - 1]! + const curr = results[i]! + + if (prev.userId < curr.userId) { + // userId ascending - this is correct + continue + } else if (prev.userId === curr.userId) { + // Same userId, viewCount should be descending + expect(prev.viewCount).toBeGreaterThanOrEqual(curr.viewCount) + } else { + // userId decreased - this is wrong + throw new Error( + `userId should be ascending but ${prev.userId} > ${curr.userId}` + ) + } + } + + await query.cleanup() + }) + + it(`should handle three-column orderBy`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // Sort by isActive desc, age asc, name asc + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.isActive, `desc`) + .orderBy(({ user }) => user.age, `asc`) + .orderBy(({ user }) => user.name, `asc`) + .limit(25) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 25 }) + + const results = Array.from(query.state.values()) + expect(results).toHaveLength(25) + + // Verify basic ordering (isActive desc, age asc) + for (let i = 1; i < results.length; i++) { + const prev = results[i - 1]! + const curr = results[i]! + + if (prev.isActive !== curr.isActive) { + expect(prev.isActive ? 1 : 0).toBeGreaterThanOrEqual( + curr.isActive ? 1 : 0 + ) + } else if (prev.age !== curr.age) { + expect(prev.age).toBeLessThanOrEqual(curr.age) + } + // For name, we don't strictly check due to collation differences + } + + await query.cleanup() + }) + + it(`should use setWindow to page through multi-column orderBy results`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // Create query with multi-column orderBy and limit + // Using age (number) and name (string) to avoid boolean comparison issues + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.age, `asc`) + .orderBy(({ user }) => user.name, `asc`) + .limit(10) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 10 }) + + // Get first page + const firstPage = Array.from(query.state.values()) + expect(firstPage).toHaveLength(10) + + // Verify first page ordering (age asc, then name asc) + for (let i = 1; i < firstPage.length; i++) { + const prev = firstPage[i - 1]! + const curr = firstPage[i]! + if (prev.age !== curr.age) { + expect(prev.age).toBeLessThanOrEqual(curr.age) + } else { + expect(prev.name.localeCompare(curr.name)).toBeLessThanOrEqual(0) + } + } + + // Move to second page using setWindow + // IMPORTANT: setWindow returns a Promise when loading is required, + // or `true` if data is already available. We verify loading occurs. + const setWindowResult = query.utils.setWindow({ offset: 10, limit: 10 }) + + // In on-demand mode, moving to offset 10 should trigger loading + // since only the first 10 records were initially loaded + if (setWindowResult !== true) { + // Loading was triggered - wait for it to complete + await setWindowResult + } + await waitForQueryData(query, { minSize: 10 }) + + // Get second page + const secondPage = Array.from(query.state.values()) + expect(secondPage).toHaveLength(10) + + // Verify second page ordering + for (let i = 1; i < secondPage.length; i++) { + const prev = secondPage[i - 1]! + const curr = secondPage[i]! + if (prev.age !== curr.age) { + expect(prev.age).toBeLessThanOrEqual(curr.age) + } else { + expect(prev.name.localeCompare(curr.name)).toBeLessThanOrEqual(0) + } + } + + // CRITICAL: Pages should not overlap - this proves new data was loaded + // If the backend didn't return new data, we'd see duplicates or missing records + const firstPageIds = new Set(firstPage.map((u) => u.id)) + const secondPageIds = new Set(secondPage.map((u) => u.id)) + for (const id of secondPageIds) { + expect(firstPageIds.has(id)).toBe(false) + } + + await query.cleanup() + }) + + it(`should use setWindow to move backwards with multi-column orderBy`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // Start at offset 20 + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.age, `asc`) + .orderBy(({ user }) => user.name, `asc`) + .limit(10) + .offset(20) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 10 }) + + const laterPage = Array.from(query.state.values()) + expect(laterPage).toHaveLength(10) + + // Move backwards to offset 10 + const setWindowResult = query.utils.setWindow({ offset: 10, limit: 10 }) + if (setWindowResult !== true) { + await setWindowResult + } + await waitForQueryData(query, { minSize: 10 }) + + const earlierPage = Array.from(query.state.values()) + expect(earlierPage).toHaveLength(10) + + // Earlier page should have different users + const laterPageIds = new Set(laterPage.map((u) => u.id)) + const earlierPageIds = new Set(earlierPage.map((u) => u.id)) + for (const id of earlierPageIds) { + expect(laterPageIds.has(id)).toBe(false) + } + + // Verify ordering on earlier page (age asc, name asc) + for (let i = 1; i < earlierPage.length; i++) { + const prev = earlierPage[i - 1]! + const curr = earlierPage[i]! + if (prev.age !== curr.age) { + expect(prev.age).toBeLessThanOrEqual(curr.age) + } else { + expect(prev.name.localeCompare(curr.name)).toBeLessThanOrEqual(0) + } + } + + await query.cleanup() + }) + + it(`should use setWindow with mixed direction multi-column orderBy`, async () => { + const config = await getConfig() + const postsCollection = config.collections.onDemand.posts + + // Sort by userId ascending, viewCount descending + const query = createLiveQueryCollection((q) => + q + .from({ post: postsCollection }) + .orderBy(({ post }) => post.userId, `asc`) + .orderBy(({ post }) => post.viewCount, `desc`) + .limit(10) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 10 }) + + const firstPage = Array.from(query.state.values()) + expect(firstPage).toHaveLength(10) + + // Move to second page + const setWindowResult = query.utils.setWindow({ offset: 10, limit: 10 }) + if (setWindowResult !== true) { + await setWindowResult + } + await waitForQueryData(query, { minSize: 10 }) + + const secondPage = Array.from(query.state.values()) + expect(secondPage).toHaveLength(10) + + // Verify ordering on second page (userId asc, viewCount desc) + for (let i = 1; i < secondPage.length; i++) { + const prev = secondPage[i - 1]! + const curr = secondPage[i]! + + if (prev.userId < curr.userId) { + // userId ascending - correct + continue + } else if (prev.userId === curr.userId) { + // Same userId, viewCount should be descending + expect(prev.viewCount).toBeGreaterThanOrEqual(curr.viewCount) + } else { + throw new Error( + `userId should be ascending but ${prev.userId} > ${curr.userId}` + ) + } + } + + await query.cleanup() + }) + + it(`should handle setWindow across duplicate first-column values`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // Age has limited unique values in test data, so many duplicates in first column + // This tests that the composite cursor correctly handles paging across duplicates + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.age, `asc`) + .orderBy(({ user }) => user.name, `asc`) + .limit(20) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 20 }) + + const firstPage = Array.from(query.state.values()) + expect(firstPage).toHaveLength(20) + + // Move to second page - this crosses the boundary where first column value changes + const setWindowResult = query.utils.setWindow({ offset: 20, limit: 20 }) + if (setWindowResult !== true) { + await setWindowResult + } + await waitForQueryData(query, { minSize: 20 }) + + const secondPage = Array.from(query.state.values()) + expect(secondPage).toHaveLength(20) + + // Verify ordering is maintained across the page boundary + for (let i = 1; i < secondPage.length; i++) { + const prev = secondPage[i - 1]! + const curr = secondPage[i]! + if (prev.age !== curr.age) { + expect(prev.age).toBeLessThanOrEqual(curr.age) + } else { + expect(prev.name.localeCompare(curr.name)).toBeLessThanOrEqual(0) + } + } + + // Pages should not overlap + const firstPageIds = new Set(firstPage.map((u) => u.id)) + for (const user of secondPage) { + expect(firstPageIds.has(user.id)).toBe(false) + } + + // Move to third page to ensure continued paging works + const setWindowResult2 = query.utils.setWindow({ + offset: 40, + limit: 20, + }) + if (setWindowResult2 !== true) { + await setWindowResult2 + } + await waitForQueryData(query, { minSize: 1 }) + + const thirdPage = Array.from(query.state.values()) + expect(thirdPage.length).toBeGreaterThan(0) + expect(thirdPage.length).toBeLessThanOrEqual(20) + + // Third page should not overlap with first or second + const secondPageIds = new Set(secondPage.map((u) => u.id)) + for (const user of thirdPage) { + expect(firstPageIds.has(user.id)).toBe(false) + expect(secondPageIds.has(user.id)).toBe(false) + } + + await query.cleanup() + }) + + it(`should trigger backend loading when paging with multi-column orderBy`, async () => { + const config = await getConfig() + const usersCollection = config.collections.onDemand.users + + // Use a small limit to ensure we need to load more data when paging + const query = createLiveQueryCollection((q) => + q + .from({ user: usersCollection }) + .orderBy(({ user }) => user.age, `asc`) + .orderBy(({ user }) => user.name, `asc`) + .limit(5) + ) + + await query.preload() + await waitForQueryData(query, { minSize: 5 }) + + // Get first page - only 5 records loaded + const firstPage = Array.from(query.state.values()) + expect(firstPage).toHaveLength(5) + const lastItemFirstPage = firstPage[firstPage.length - 1]! + + // Move to second page - this MUST trigger backend loading + // since we only have 5 records and need records at offset 5 + const setWindowResult = query.utils.setWindow({ offset: 5, limit: 5 }) + + // CRITICAL ASSERTION: In on-demand mode, setWindow should return a Promise + // when we need to load data we don't have yet. This proves loading was triggered. + // If it returned `true`, it would mean data was already available (no loading needed). + expect( + setWindowResult === true || setWindowResult instanceof Promise + ).toBe(true) + + if (setWindowResult !== true) { + // Wait for loading to complete + await setWindowResult + } + await waitForQueryData(query, { minSize: 5 }) + + // Get second page + const secondPage = Array.from(query.state.values()) + expect(secondPage).toHaveLength(5) + + // Verify we got different records (proves new data was loaded from backend) + const firstPageIds = new Set(firstPage.map((u) => u.id)) + for (const user of secondPage) { + expect(firstPageIds.has(user.id)).toBe(false) + } + + // Verify ordering continues correctly from where first page ended + const firstItemSecondPage = secondPage[0]! + + // The first item of page 2 should come after the last item of page 1 + // in the sort order (age asc, name asc) + if (lastItemFirstPage.age === firstItemSecondPage.age) { + // Same age value, so name should be greater or equal + expect( + firstItemSecondPage.name.localeCompare(lastItemFirstPage.name) + ).toBeGreaterThanOrEqual(0) + } else { + // Different age, page 2 first should have greater or equal age + expect(firstItemSecondPage.age).toBeGreaterThanOrEqual( + lastItemFirstPage.age + ) + } + + await query.cleanup() + }) }) describe(`Limit`, () => { diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 858503f6f..3d21ee9d0 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -2,11 +2,7 @@ import { generateKeyBetween } from "fractional-indexing" import { DifferenceStreamWriter, UnaryOperator } from "../graph.js" import { StreamBuilder } from "../d2.js" import { MultiSet } from "../multiset.js" -import { - binarySearch, - diffHalfOpen, - globalObjectIdGenerator, -} from "../utils.js" +import { binarySearch, diffHalfOpen } from "../utils.js" import type { HRange } from "../utils.js" import type { DifferenceStreamReader } from "../graph.js" import type { IStreamBuilder, PipedOperator } from "../types.js" @@ -248,8 +244,9 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< /** * topK data structure that supports insertions and deletions * and returns changes to the topK. + * Elements are stored as [key, value] tuples for stable tie-breaking. */ - #topK: TopK> + #topK: TopK<[K, T]> constructor( id: number, @@ -261,21 +258,19 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< super(id, inputA, output) const limit = options.limit ?? Infinity const offset = options.offset ?? 0 - const compareTaggedValues = ( - a: TaggedValue, - b: TaggedValue - ) => { + const compareKeyedValues = ([aKey, aVal]: [K, T], [bKey, bVal]: [K, T]) => { // First compare on the value - const valueComparison = comparator(getVal(a), getVal(b)) + const valueComparison = comparator(aVal, bVal) if (valueComparison !== 0) { return valueComparison } - // If the values are equal, compare on the tag (object identity) - const tieBreakerA = getTag(a) - const tieBreakerB = getTag(b) - return tieBreakerA - tieBreakerB + // If the values are equal, use the row key as tie-breaker + // This provides stable, deterministic ordering since keys are string | number + if (aKey === bKey) return 0 + if (aKey < bKey) return -1 + return 1 } - this.#topK = this.createTopK(offset, limit, compareTaggedValues) + this.#topK = this.createTopK(offset, limit, compareKeyedValues) options.setSizeCallback?.(() => this.#topK.size) options.setWindowFn?.(this.moveTopK.bind(this)) } @@ -283,8 +278,8 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< protected createTopK( offset: number, limit: number, - comparator: (a: TaggedValue, b: TaggedValue) => number - ): TopK> { + comparator: (a: [K, T], b: [K, T]) => number + ): TopK<[K, T]> { return new TopKArray(offset, limit, comparator) } @@ -336,20 +331,18 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< ): void { const { oldMultiplicity, newMultiplicity } = this.addKey(key, multiplicity) - let res: TopKChanges> = { + let res: TopKChanges<[K, T]> = { moveIn: null, moveOut: null, } if (oldMultiplicity <= 0 && newMultiplicity > 0) { // The value was invisible but should now be visible // Need to insert it into the array of sorted values - const taggedValue = tagValue(key, value) - res = this.#topK.insert(taggedValue) + res = this.#topK.insert([key, value]) } else if (oldMultiplicity > 0 && newMultiplicity <= 0) { // The value was visible but should now be invisible // Need to remove it from the array of sorted values - const taggedValue = tagValue(key, value) - res = this.#topK.delete(taggedValue) + res = this.#topK.delete([key, value]) } else { // The value was invisible and it remains invisible // or it was visible and remains visible @@ -363,28 +356,22 @@ export class TopKWithFractionalIndexOperator extends UnaryOperator< } private handleMoveIn( - moveIn: IndexedValue> | null, + moveIn: IndexedValue<[K, T]> | null, result: Array<[[K, IndexedValue], number]> ) { if (moveIn) { - const index = getIndex(moveIn) - const taggedValue = getValue(moveIn) - const k = getKey(taggedValue) - const val = getVal(taggedValue) - result.push([[k, [val, index]], 1]) + const [[key, value], index] = moveIn + result.push([[key, [value, index]], 1]) } } private handleMoveOut( - moveOut: IndexedValue> | null, + moveOut: IndexedValue<[K, T]> | null, result: Array<[[K, IndexedValue], number]> ) { if (moveOut) { - const index = getIndex(moveOut) - const taggedValue = getValue(moveOut) - const k = getKey(taggedValue) - const val = getVal(taggedValue) - result.push([[k, [val, index]], -1]) + const [[key, value], index] = moveOut + result.push([[key, [value, index]], -1]) } } @@ -460,22 +447,3 @@ export function getValue(indexedVal: IndexedValue): V { export function getIndex(indexedVal: IndexedValue): FractionalIndex { return indexedVal[1] } - -export type Tag = number -export type TaggedValue = [K, V, Tag] - -function tagValue(key: K, value: V): TaggedValue { - return [key, value, globalObjectIdGenerator.getId(key)] -} - -function getKey(tieBreakerTaggedValue: TaggedValue): K { - return tieBreakerTaggedValue[0] -} - -function getVal(tieBreakerTaggedValue: TaggedValue): V { - return tieBreakerTaggedValue[1] -} - -function getTag(tieBreakerTaggedValue: TaggedValue): Tag { - return tieBreakerTaggedValue[2] -} diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts index 7b094d7c4..d962812b7 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndexBTree.ts @@ -10,7 +10,6 @@ import { import type { IStreamBuilder, PipedOperator } from "../types.js" import type { IndexedValue, - TaggedValue, TopK, TopKChanges, TopKWithFractionalIndexOptions, @@ -249,8 +248,8 @@ export class TopKWithFractionalIndexBTreeOperator< protected override createTopK( offset: number, limit: number, - comparator: (a: TaggedValue, b: TaggedValue) => number - ): TopK> { + comparator: (a: [K, T], b: [K, T]) => number + ): TopK<[K, T]> { if (BTree === undefined) { throw new Error( `B+ tree not loaded. You need to call loadBTree() before using TopKWithFractionalIndexBTreeOperator.` diff --git a/packages/db/src/SortedMap.ts b/packages/db/src/SortedMap.ts index 268b614f0..cd26ad937 100644 --- a/packages/db/src/SortedMap.ts +++ b/packages/db/src/SortedMap.ts @@ -1,62 +1,81 @@ +import { compareKeys } from "./utils/comparison.js" + /** * A Map implementation that keeps its entries sorted based on a comparator function - * @template TKey - The type of keys in the map + * @template TKey - The type of keys in the map (must be string | number) * @template TValue - The type of values in the map */ -export class SortedMap { +export class SortedMap { private map: Map private sortedKeys: Array - private comparator: (a: TValue, b: TValue) => number + private comparator: ((a: TValue, b: TValue) => number) | undefined /** * Creates a new SortedMap instance * - * @param comparator - Optional function to compare values for sorting + * @param comparator - Optional function to compare values for sorting. + * If not provided, entries are sorted by key only. */ constructor(comparator?: (a: TValue, b: TValue) => number) { this.map = new Map() this.sortedKeys = [] - this.comparator = comparator || this.defaultComparator - } - - /** - * Default comparator function used when none is provided - * - * @param a - First value to compare - * @param b - Second value to compare - * @returns -1 if a < b, 1 if a > b, 0 if equal - */ - private defaultComparator(a: TValue, b: TValue): number { - if (a < b) return -1 - if (a > b) return 1 - return 0 + this.comparator = comparator } /** * Finds the index where a key-value pair should be inserted to maintain sort order. - * Uses binary search to find the correct position based on the value. - * Hence, it is in O(log n) time. + * Uses binary search to find the correct position based on the value (if comparator provided), + * with key-based tie-breaking for deterministic ordering when values compare as equal. + * If no comparator is provided, sorts by key only. + * Runs in O(log n) time. * - * @param key - The key to find position for - * @param value - The value to compare against + * @param key - The key to find position for (used as tie-breaker or primary sort when no comparator) + * @param value - The value to compare against (only used if comparator is provided) * @returns The index where the key should be inserted */ - private indexOf(value: TValue): number { + private indexOf(key: TKey, value: TValue): number { let left = 0 let right = this.sortedKeys.length + // Fast path: no comparator means sort by key only + if (!this.comparator) { + while (left < right) { + const mid = Math.floor((left + right) / 2) + const midKey = this.sortedKeys[mid]! + const keyComparison = compareKeys(key, midKey) + if (keyComparison < 0) { + right = mid + } else if (keyComparison > 0) { + left = mid + 1 + } else { + return mid + } + } + return left + } + + // With comparator: sort by value first, then key as tie-breaker while (left < right) { const mid = Math.floor((left + right) / 2) const midKey = this.sortedKeys[mid]! const midValue = this.map.get(midKey)! - const comparison = this.comparator(value, midValue) + const valueComparison = this.comparator(value, midValue) - if (comparison < 0) { + if (valueComparison < 0) { right = mid - } else if (comparison > 0) { + } else if (valueComparison > 0) { left = mid + 1 } else { - return mid + // Values are equal, use key as tie-breaker for deterministic ordering + const keyComparison = compareKeys(key, midKey) + if (keyComparison < 0) { + right = mid + } else if (keyComparison > 0) { + left = mid + 1 + } else { + // Same key (shouldn't happen during insert, but handle for lookups) + return mid + } } } @@ -74,12 +93,12 @@ export class SortedMap { if (this.map.has(key)) { // Need to remove the old key from the sorted keys array const oldValue = this.map.get(key)! - const oldIndex = this.indexOf(oldValue) + const oldIndex = this.indexOf(key, oldValue) this.sortedKeys.splice(oldIndex, 1) } // Insert the new key at the correct position - const index = this.indexOf(value) + const index = this.indexOf(key, value) this.sortedKeys.splice(index, 0, key) this.map.set(key, value) @@ -106,7 +125,7 @@ export class SortedMap { delete(key: TKey): boolean { if (this.map.has(key)) { const oldValue = this.map.get(key) - const index = this.indexOf(oldValue!) + const index = this.indexOf(key, oldValue!) this.sortedKeys.splice(index, 1) return this.map.delete(key) } diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index f7b03da33..6d58d4099 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -43,7 +43,7 @@ export class CollectionStateManager< public pendingSyncedTransactions: Array< PendingSyncedTransaction > = [] - public syncedData: Map | SortedMap + public syncedData: SortedMap public syncedMetadata = new Map() // Optimistic state tracking - make public for testing @@ -69,12 +69,9 @@ export class CollectionStateManager< a.compareCreatedAt(b) ) - // Set up data storage with optional comparison function - if (config.compare) { - this.syncedData = new SortedMap(config.compare) - } else { - this.syncedData = new Map() - } + // Set up data storage - always use SortedMap for deterministic iteration. + // If a custom compare function is provided, use it; otherwise entries are sorted by key only. + this.syncedData = new SortedMap(config.compare) } setDeps(deps: { diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 1c23d7b04..98f80b7f0 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -1,5 +1,5 @@ import { ensureIndexForExpression } from "../indexes/auto-index.js" -import { and, eq, gt, gte, lt } from "../query/builder/functions.js" +import { and, eq, gt, gte, lt, or } from "../query/builder/functions.js" import { Value } from "../query/ir.js" import { EventEmitter } from "../event-emitter.js" import { @@ -22,12 +22,19 @@ type RequestSnapshotOptions = { where?: BasicExpression optimizedOnly?: boolean trackLoadSubsetPromise?: boolean + /** Optional orderBy to pass to loadSubset for backend optimization */ + orderBy?: OrderBy + /** Optional limit to pass to loadSubset for backend optimization */ + limit?: number } type RequestLimitedSnapshotOptions = { orderBy: OrderBy limit: number - minValue?: any + /** All column values for cursor (first value used for local index, all values for sync layer) */ + minValues?: Array + /** Row offset for offset-based pagination (passed to sync layer) */ + offset?: number } type CollectionSubscriptionOptions = { @@ -38,6 +45,75 @@ type CollectionSubscriptionOptions = { onUnsubscribe?: (event: SubscriptionUnsubscribedEvent) => void } +/** + * Builds a composite cursor expression for multi-column orderBy. + * For [col1 ASC, col2 DESC] with values [v1, v2], produces: + * or( + * gt(col1, v1), // col1 > v1 + * and(eq(col1, v1), lt(col2, v2)) // col1 = v1 AND col2 < v2 (DESC) + * ) + * + * This creates a precise cursor that works with composite indexes on the backend. + */ +function buildCompositeCursor( + orderBy: OrderBy, + values: Array +): BasicExpression | undefined { + if (values.length === 0 || orderBy.length === 0) { + return undefined + } + + // For single column, just use simple gt/lt + if (orderBy.length === 1) { + const { expression, compareOptions } = orderBy[0]! + const operator = compareOptions.direction === `asc` ? gt : lt + return operator(expression, new Value(values[0])) + } + + // For multi-column, build the composite cursor: + // or( + // gt(col1, v1), + // and(eq(col1, v1), gt(col2, v2)), + // and(eq(col1, v1), eq(col2, v2), gt(col3, v3)), + // ... + // ) + const clauses: Array> = [] + + for (let i = 0; i < orderBy.length && i < values.length; i++) { + const clause = orderBy[i]! + const value = values[i] + + // Build equality conditions for all previous columns + const eqConditions: Array> = [] + for (let j = 0; j < i; j++) { + const prevClause = orderBy[j]! + const prevValue = values[j] + eqConditions.push(eq(prevClause.expression, new Value(prevValue))) + } + + // Add the comparison for the current column (respecting direction) + const operator = clause.compareOptions.direction === `asc` ? gt : lt + const comparison = operator(clause.expression, new Value(value)) + + if (eqConditions.length === 0) { + // First column: just the comparison + clauses.push(comparison) + } else { + // Subsequent columns: and(eq(prev...), comparison) + // We need to spread into and() which expects at least 2 args + const allConditions = [...eqConditions, comparison] + clauses.push(allConditions.reduce((acc, cond) => and(acc, cond))) + } + } + + // Combine all clauses with OR + if (clauses.length === 1) { + return clauses[0]! + } + // Use reduce to combine with or() which expects exactly 2 args + return clauses.reduce((acc, clause) => or(acc, clause)) +} + export class CollectionSubscription extends EventEmitter implements Subscription @@ -57,6 +133,12 @@ export class CollectionSubscription // Keep track of the keys we've sent (needed for join and orderBy optimizations) private sentKeys = new Set() + // Track the count of rows sent via requestLimitedSnapshot for offset-based pagination + private limitedSnapshotRowCount = 0 + + // Track the last key sent via requestLimitedSnapshot for cursor-based pagination + private lastSentKey: string | number | undefined + private filteredCallback: (changes: Array>) => void private orderByIndex: IndexInterface | undefined @@ -203,6 +285,9 @@ export class CollectionSubscription const loadOptions: LoadSubsetOptions = { where: stateOpts.where, subscription: this, + // Include orderBy and limit if provided so sync layer can optimize the query + orderBy: opts?.orderBy, + limit: opts?.limit, } const syncResult = this.collection._sync.loadSubset(loadOptions) @@ -233,17 +318,23 @@ export class CollectionSubscription } /** - * Sends a snapshot that fulfills the `where` clause and all rows are bigger or equal to `minValue`. + * Sends a snapshot that fulfills the `where` clause and all rows are bigger or equal to the cursor. * Requires a range index to be set with `setOrderByIndex` prior to calling this method. * It uses that range index to load the items in the order of the index. - * Note 1: it may load more rows than the provided LIMIT because it loads all values equal to `minValue` + limit values greater than `minValue`. + * + * For multi-column orderBy: + * - Uses first value from `minValues` for LOCAL index operations (wide bounds, ensures no missed rows) + * - Uses all `minValues` to build a precise composite cursor for SYNC layer loadSubset + * + * Note 1: it may load more rows than the provided LIMIT because it loads all values equal to the first cursor value + limit values greater. * This is needed to ensure that it does not accidentally skip duplicate values when the limit falls in the middle of some duplicated values. * Note 2: it does not send keys that have already been sent before. */ requestLimitedSnapshot({ orderBy, limit, - minValue, + minValues, + offset, }: RequestLimitedSnapshotOptions) { if (!limit) throw new Error(`limit is required`) @@ -253,6 +344,11 @@ export class CollectionSubscription ) } + // Derive first column value from minValues (used for local index operations) + const minValue = minValues?.[0] + // Cast for index operations (index expects string | number) + const minValueForIndex = minValue as string | number | undefined + const index = this.orderByIndex const where = this.options.whereExpression const whereFilterFn = where @@ -272,7 +368,7 @@ export class CollectionSubscription return whereFilterFn?.(value) ?? true } - let biggestObservedValue = minValue + let biggestObservedValue = minValueForIndex const changes: Array> = [] // If we have a minValue we need to handle the case @@ -281,12 +377,16 @@ export class CollectionSubscription // so if minValue is 3 then the previous snapshot may not have included all 3s // e.g. if it was offset 0 and limit 3 it would only have loaded the first 3 // so we load all rows equal to minValue first, to be sure we don't skip any duplicate values + // + // For multi-column orderBy, we use the first column value for index operations (wide bounds) + // This may load some duplicates but ensures we never miss any rows. let keys: Array = [] - if (minValue !== undefined) { - // First, get all items with the same value as minValue + if (minValueForIndex !== undefined) { + // First, get all items with the same FIRST COLUMN value as minValue + // This provides wide bounds for the local index const { expression } = orderBy[0]! const allRowsWithMinValue = this.collection.currentStateAsChanges({ - where: eq(expression, new Value(minValue)), + where: eq(expression, new Value(minValueForIndex)), }) if (allRowsWithMinValue) { @@ -300,15 +400,15 @@ export class CollectionSubscription // Then get items greater than minValue const keysGreaterThanMin = index.take( limit - keys.length, - minValue, + minValueForIndex, filterFn ) keys.push(...keysGreaterThanMin) } else { - keys = index.take(limit, minValue, filterFn) + keys = index.take(limit, minValueForIndex, filterFn) } } else { - keys = index.take(limit, minValue, filterFn) + keys = index.take(limit, minValueForIndex, filterFn) } const valuesNeeded = () => Math.max(limit - changes.length, 0) @@ -331,76 +431,79 @@ export class CollectionSubscription keys = index.take(valuesNeeded(), biggestObservedValue, filterFn) } + // Track row count for offset-based pagination (before sending to callback) + // Use the current count as the offset for this load + const currentOffset = this.limitedSnapshotRowCount + this.callback(changes) - let whereWithValueFilter = where - if (typeof minValue !== `undefined`) { - // Only request data that we haven't seen yet (i.e. is bigger than the minValue) - const { expression, compareOptions } = orderBy[0]! - const operator = compareOptions.direction === `asc` ? gt : lt - const valueFilter = operator(expression, new Value(minValue)) - whereWithValueFilter = where ? and(where, valueFilter) : valueFilter + // Update the row count and last key after sending (for next call's offset/cursor) + this.limitedSnapshotRowCount += changes.length + if (changes.length > 0) { + this.lastSentKey = changes[changes.length - 1]!.key + } + + // Build cursor expressions for sync layer loadSubset + // The cursor expressions are separate from the main where clause + // so the sync layer can choose cursor-based or offset-based pagination + let cursorExpressions: + | { + whereFrom: BasicExpression + whereCurrent: BasicExpression + lastKey: string | number + } + | undefined + + if ( + minValues !== undefined && + minValues.length > 0 && + this.lastSentKey !== undefined + ) { + const whereFromCursor = buildCompositeCursor(orderBy, minValues) + + if (whereFromCursor) { + const { expression } = orderBy[0]! + const minValue = minValues[0] + + // Build the whereCurrent expression for the first orderBy column + // For Date values, we need to handle precision differences between JS (ms) and backends (μs) + // A JS Date represents a 1ms range, so we query for all values within that range + let whereCurrentCursor: BasicExpression + if (minValue instanceof Date) { + const minValuePlus1ms = new Date(minValue.getTime() + 1) + whereCurrentCursor = and( + gte(expression, new Value(minValue)), + lt(expression, new Value(minValuePlus1ms)) + ) + } else { + whereCurrentCursor = eq(expression, new Value(minValue)) + } + + cursorExpressions = { + whereFrom: whereFromCursor, + whereCurrent: whereCurrentCursor, + lastKey: this.lastSentKey, + } + } } // Request the sync layer to load more data // don't await it, we will load the data into the collection when it comes in - const loadOptions1: LoadSubsetOptions = { - where: whereWithValueFilter, + // Note: `where` does NOT include cursor expressions - they are passed separately + // The sync layer can choose to use cursor-based or offset-based pagination + const loadOptions: LoadSubsetOptions = { + where, // Main filter only, no cursor limit, orderBy, + cursor: cursorExpressions, // Cursor expressions passed separately + offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset subscription: this, } - const syncResult = this.collection._sync.loadSubset(loadOptions1) + const syncResult = this.collection._sync.loadSubset(loadOptions) // Track this loadSubset call - this.loadedSubsets.push(loadOptions1) - - // Make parallel loadSubset calls for values equal to minValue and values greater than minValue - const promises: Array> = [] - - // First promise: load all values equal to minValue - if (typeof minValue !== `undefined`) { - const { expression } = orderBy[0]! - - // For Date values, we need to handle precision differences between JS (ms) and backends (μs) - // A JS Date represents a 1ms range, so we query for all values within that range - let exactValueFilter - if (minValue instanceof Date) { - const minValuePlus1ms = new Date(minValue.getTime() + 1) - exactValueFilter = and( - gte(expression, new Value(minValue)), - lt(expression, new Value(minValuePlus1ms)) - ) - } else { - exactValueFilter = eq(expression, new Value(minValue)) - } - - const loadOptions2: LoadSubsetOptions = { - where: exactValueFilter, - subscription: this, - } - const equalValueResult = this.collection._sync.loadSubset(loadOptions2) - - // Track this loadSubset call - this.loadedSubsets.push(loadOptions2) - - if (equalValueResult instanceof Promise) { - promises.push(equalValueResult) - } - } - - // Second promise: load values greater than minValue - if (syncResult instanceof Promise) { - promises.push(syncResult) - } - - // Track the combined promise - if (promises.length > 0) { - const combinedPromise = Promise.all(promises).then(() => {}) - this.trackLoadSubsetPromise(combinedPromise) - } else { - this.trackLoadSubsetPromise(syncResult) - } + this.loadedSubsets.push(loadOptions) + this.trackLoadSubsetPromise(syncResult) } // TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function diff --git a/packages/db/src/indexes/btree-index.ts b/packages/db/src/indexes/btree-index.ts index 467d5e1cc..8a45ab86a 100644 --- a/packages/db/src/indexes/btree-index.ts +++ b/packages/db/src/indexes/btree-index.ts @@ -1,5 +1,9 @@ import { BTree } from "../utils/btree.js" -import { defaultComparator, normalizeValue } from "../utils/comparison.js" +import { + compareKeys, + defaultComparator, + normalizeValue, +} from "../utils/comparison.js" import { BaseIndex } from "./base-index.js" import type { CompareOptions } from "../query/builder/types.js" import type { BasicExpression } from "../query/ir.js" @@ -261,7 +265,8 @@ export class BTreeIndex< n: number, nextPair: (k?: any) => [any, any] | undefined, from?: any, - filterFn?: (key: TKey) => boolean + filterFn?: (key: TKey) => boolean, + reversed: boolean = false ): Array { const keysInResult: Set = new Set() const result: Array = [] @@ -271,14 +276,25 @@ export class BTreeIndex< while ((pair = nextPair(key)) !== undefined && result.length < n) { key = pair[0] const keys = this.valueMap.get(key) - if (keys) { - const it = keys.values() - let ks: TKey | undefined - while (result.length < n && (ks = it.next().value)) { + if (keys && keys.size > 0) { + // Fast path: single key doesn't need sorting + if (keys.size === 1) { + const ks = keys.values().next().value as TKey if (!keysInResult.has(ks) && (filterFn?.(ks) ?? true)) { result.push(ks) keysInResult.add(ks) } + } else { + // Sort keys for deterministic order, reverse if needed + const sorted = Array.from(keys).sort(compareKeys) + if (reversed) sorted.reverse() + for (const ks of sorted) { + if (result.length >= n) break + if (!keysInResult.has(ks) && (filterFn?.(ks) ?? true)) { + result.push(ks) + keysInResult.add(ks) + } + } } } } @@ -309,7 +325,7 @@ export class BTreeIndex< filterFn?: (key: TKey) => boolean ): Array { const nextPair = (k?: any) => this.orderedEntries.nextLowerPair(k) - return this.takeInternal(n, nextPair, from, filterFn) + return this.takeInternal(n, nextPair, from, filterFn, true) } /** diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 355f922bf..159129f97 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -27,8 +27,12 @@ export type OrderByOptimizationInfo = { a: Record | null | undefined, b: Record | null | undefined ) => number - valueExtractorForRawRow: (row: Record) => any - index: IndexInterface + /** Extracts all orderBy column values from a raw row (array for multi-column) */ + valueExtractorForRawRow: (row: Record) => unknown + /** Extracts only the first column value - used for index-based cursor */ + firstColumnValueExtractor: (row: Record) => unknown + /** Index on the first orderBy column - used for lazy loading */ + index?: IndexInterface dataNeeded?: () => number } @@ -117,76 +121,165 @@ export function processOrderBy( let orderByOptimizationInfo: OrderByOptimizationInfo | undefined - // Optimize the orderBy operator to lazily load elements - // by using the range index of the collection. - // Only for orderBy clause on a single column for now (no composite ordering) - if (limit && orderByClause.length === 1) { - const clause = orderByClause[0]! - const orderByExpression = clause.expression + // When there's a limit, we create orderByOptimizationInfo to pass orderBy/limit + // to loadSubset so the sync layer can optimize the query. + // We try to use an index on the FIRST orderBy column for lazy loading, + // even for multi-column orderBy (using wider bounds on first column). + if (limit) { + let index: IndexInterface | undefined + let followRefCollection: Collection | undefined + let firstColumnValueExtractor: CompiledSingleRowExpression | undefined + let orderByAlias: string = rawQuery.from.alias - if (orderByExpression.type === `ref`) { + // Try to create/find an index on the FIRST orderBy column for lazy loading + const firstClause = orderByClause[0]! + const firstOrderByExpression = firstClause.expression + + if (firstOrderByExpression.type === `ref`) { const followRefResult = followRef( rawQuery, - orderByExpression, + firstOrderByExpression, collection - )! - - const followRefCollection = followRefResult.collection - const fieldName = followRefResult.path[0] - const compareOpts = buildCompareOptions(clause, followRefCollection) - if (fieldName) { - ensureIndexForField( - fieldName, - followRefResult.path, - followRefCollection, - compareOpts, - compare + ) + + if (followRefResult) { + followRefCollection = followRefResult.collection + const fieldName = followRefResult.path[0] + const compareOpts = buildCompareOptions( + firstClause, + followRefCollection ) - } - const valueExtractorForRawRow = compileExpression( - new PropRef(followRefResult.path), - true - ) as CompiledSingleRowExpression + if (fieldName) { + ensureIndexForField( + fieldName, + followRefResult.path, + followRefCollection, + compareOpts, + compare + ) + } - const comparator = ( - a: Record | null | undefined, - b: Record | null | undefined - ) => { - const extractedA = a ? valueExtractorForRawRow(a) : a - const extractedB = b ? valueExtractorForRawRow(b) : b - return compare(extractedA, extractedB) - } + // First column value extractor - used for index cursor + firstColumnValueExtractor = compileExpression( + new PropRef(followRefResult.path), + true + ) as CompiledSingleRowExpression - const index: IndexInterface | undefined = - findIndexForField( + index = findIndexForField( followRefCollection, followRefResult.path, compareOpts ) - if (index && index.supports(`gt`)) { - // We found an index that we can use to lazily load ordered data - const orderByAlias = - orderByExpression.path.length > 1 - ? String(orderByExpression.path[0]) + // Only use the index if it supports range queries + if (!index?.supports(`gt`)) { + index = undefined + } + + orderByAlias = + firstOrderByExpression.path.length > 1 + ? String(firstOrderByExpression.path[0]) : rawQuery.from.alias + } + } - orderByOptimizationInfo = { - alias: orderByAlias, - offset: offset ?? 0, - limit, - comparator, - valueExtractorForRawRow, - index, - orderBy: orderByClause, + // Only create comparator and value extractors if the first column is a ref expression + // For aggregate or computed expressions, we can't extract values from raw collection rows + if (!firstColumnValueExtractor) { + // Skip optimization for non-ref expressions (aggregates, computed values, etc.) + // The query will still work, but without lazy loading optimization + } else { + // Build value extractors for all columns (must all be ref expressions for multi-column) + // Check if all orderBy expressions are ref types (required for multi-column extraction) + const allColumnsAreRefs = orderByClause.every( + (clause) => clause.expression.type === `ref` + ) + + // Create extractors for all columns if they're all refs + const allColumnExtractors: + | Array + | undefined = allColumnsAreRefs + ? orderByClause.map((clause) => { + // We know it's a ref since we checked allColumnsAreRefs + const refExpr = clause.expression as PropRef + const followResult = followRef(rawQuery, refExpr, collection) + if (followResult) { + return compileExpression( + new PropRef(followResult.path), + true + ) as CompiledSingleRowExpression + } + // Fallback for refs that don't follow + return compileExpression( + clause.expression, + true + ) as CompiledSingleRowExpression + }) + : undefined + + // Create a comparator for raw rows (used for tracking sent values) + // This compares ALL orderBy columns for proper ordering + const comparator = ( + a: Record | null | undefined, + b: Record | null | undefined + ) => { + if (orderByClause.length === 1) { + // Single column: extract and compare + const extractedA = a ? firstColumnValueExtractor(a) : a + const extractedB = b ? firstColumnValueExtractor(b) : b + return compare(extractedA, extractedB) + } + if (allColumnExtractors) { + // Multi-column with all refs: extract all values and compare + const extractAll = ( + row: Record | null | undefined + ) => { + if (!row) return row + return allColumnExtractors.map((extractor) => extractor(row)) + } + return compare(extractAll(a), extractAll(b)) + } + // Fallback: can't compare (shouldn't happen since we skip non-ref cases) + return 0 + } + + // Create a value extractor for raw rows that extracts ALL orderBy column values + // This is used for tracking sent values and building composite cursors + const rawRowValueExtractor = (row: Record): unknown => { + if (orderByClause.length === 1) { + // Single column: return single value + return firstColumnValueExtractor(row) } + if (allColumnExtractors) { + // Multi-column: return array of all values + return allColumnExtractors.map((extractor) => extractor(row)) + } + // Fallback (shouldn't happen) + return undefined + } + + orderByOptimizationInfo = { + alias: orderByAlias, + offset: offset ?? 0, + limit, + comparator, + valueExtractorForRawRow: rawRowValueExtractor, + firstColumnValueExtractor: firstColumnValueExtractor, + index, + orderBy: orderByClause, + } - optimizableOrderByCollections[followRefCollection.id] = - orderByOptimizationInfo + // Store the optimization info keyed by collection ID + // Use the followed collection if available, otherwise use the main collection + const targetCollectionId = followRefCollection?.id ?? collection.id + optimizableOrderByCollections[targetCollectionId] = + orderByOptimizationInfo + // Set up lazy loading callback if we have an index + if (index) { setSizeCallback = (getSize: () => number) => { - optimizableOrderByCollections[followRefCollection.id]![`dataNeeded`] = + optimizableOrderByCollections[targetCollectionId]![`dataNeeded`] = () => { const size = getSize() return Math.max(0, orderByOptimizationInfo!.limit - size) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index dd28be356..053233ff0 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -99,6 +99,14 @@ export class CollectionConfigBuilder< // Error state tracking private isInErrorState = false + // Track whether we've already marked the live query as ready + // Used to ensure we only wait for the first loadSubset, not subsequent ones + private hasMarkedReady = false + + // Track whether we've set up the loadingSubset listener + // Prevents duplicate listeners when updateLiveQueryStatus is called multiple times + private hasSetupLoadingListener = false + // Reference to the live query collection for error state transitions public liveQueryCollection?: Collection @@ -611,6 +619,10 @@ export class CollectionConfigBuilder< // The scheduler's listener Set would otherwise keep a strong reference to this builder this.unsubscribeFromSchedulerClears?.() this.unsubscribeFromSchedulerClears = undefined + + // Reset ready state tracking for potential restart + this.hasMarkedReady = false + this.hasSetupLoadingListener = false } } @@ -788,15 +800,42 @@ export class CollectionConfigBuilder< private updateLiveQueryStatus(config: SyncMethods) { const { markReady } = config - // Don't update status if already in error - if (this.isInErrorState) { + // Don't update status if already in error or already marked ready + if (this.isInErrorState || this.hasMarkedReady) { return } - // Mark ready when all source collections are ready - if (this.allCollectionsReady()) { - markReady() + // Check if all source collections are ready + if (!this.allCollectionsReady()) { + return } + + // If the live query is currently loading a subset (e.g., initial on-demand load), + // wait for it to complete before marking ready. This ensures that for on-demand + // sync mode, the live query isn't marked ready until the first data is loaded. + // We only wait for the FIRST loadSubset - subsequent loads (pagination/windowing) + // should not affect the ready status. + if (this.liveQueryCollection?.isLoadingSubset) { + // Set up a one-time listener if we haven't already + if (!this.hasSetupLoadingListener) { + this.hasSetupLoadingListener = true + const unsubscribe = this.liveQueryCollection.on( + `loadingSubset:change`, + (event) => { + if (!event.isLoadingSubset) { + unsubscribe() + // Re-check and mark ready now that loading is complete + this.updateLiveQueryStatus(config) + } + } + ) + } + return + } + + // Mark ready when all source collections are ready and no initial loading is in progress + this.hasMarkedReady = true + markReady() } /** diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index 38614cb0a..466ec2025 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -162,11 +162,23 @@ export class CollectionSubscriber< this.sendChangesToPipeline(changes) } + // For on-demand sync mode, we need to track the initial loadSubset promise + // so that the live query collection shows isLoading=true until data arrives. + // For eager sync mode, data is already available so we don't need to track it. + const isOnDemand = this.collection.config.syncMode === `on-demand` + + // Create subscription without includeInitialState - we'll handle it manually + // to control whether the loadSubset promise is tracked const subscription = this.collection.subscribeChanges(sendChanges, { - includeInitialState, + includeInitialState: !isOnDemand && includeInitialState, whereExpression, }) + // For on-demand sources with initial state, manually request snapshot with tracking + if (isOnDemand && includeInitialState) { + subscription.requestSnapshot({ trackLoadSubsetPromise: true }) + } + return subscription } @@ -190,17 +202,30 @@ export class CollectionSubscriber< whereExpression, }) - subscription.setOrderByIndex(index) - // Normalize the orderBy clauses such that the references are relative to the collection const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias) - // Load the first `offset + limit` values from the index - // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ - subscription.requestLimitedSnapshot({ - limit: offset + limit, - orderBy: normalizedOrderBy, - }) + if (index) { + // We have an index on the first orderBy column - use lazy loading optimization + // This works for both single-column and multi-column orderBy: + // - Single-column: index provides exact ordering + // - Multi-column: index provides ordering on first column, secondary sort in memory + subscription.setOrderByIndex(index) + + // Load the first `offset + limit` values from the index + // i.e. the K items from the collection that fall into the requested range: [offset, offset + limit[ + subscription.requestLimitedSnapshot({ + limit: offset + limit, + orderBy: normalizedOrderBy, + }) + } else { + // No index available (e.g., non-ref expression): pass orderBy/limit to loadSubset + // so the sync layer can optimize if the backend supports it + subscription.requestSnapshot({ + orderBy: normalizedOrderBy, + limit: offset + limit, + }) + } return subscription } @@ -220,11 +245,10 @@ export class CollectionSubscriber< const { dataNeeded } = orderByInfo if (!dataNeeded) { - // This should never happen because the topK operator should always set the size callback - // which in turn should lead to the orderBy operator setting the dataNeeded callback - throw new Error( - `Missing dataNeeded callback for collection ${this.collectionId}` - ) + // dataNeeded is not set when there's no index (e.g., non-ref expression). + // In this case, we've already loaded all data via requestSnapshot + // and don't need to lazily load more. + return true } // `dataNeeded` probes the orderBy operator to see if it needs more data @@ -275,9 +299,20 @@ export class CollectionSubscriber< } const { orderBy, valueExtractorForRawRow } = orderByInfo const biggestSentRow = this.biggest - const biggestSentValue = biggestSentRow + + // Extract all orderBy column values from the biggest sent row + // For single-column: returns single value, for multi-column: returns array + const extractedValues = biggestSentRow ? valueExtractorForRawRow(biggestSentRow) - : biggestSentRow + : undefined + + // Normalize to array format for minValues + const minValues = + extractedValues !== undefined + ? Array.isArray(extractedValues) + ? extractedValues + : [extractedValues] + : undefined // Normalize the orderBy clauses such that the references are relative to the collection const normalizedOrderBy = normalizeOrderByPaths(orderBy, this.alias) @@ -286,7 +321,7 @@ export class CollectionSubscriber< subscription.requestLimitedSnapshot({ orderBy: normalizedOrderBy, limit: n, - minValue: biggestSentValue, + minValues, }) } diff --git a/packages/db/src/types.ts b/packages/db/src/types.ts index f41492ec7..d76a47229 100644 --- a/packages/db/src/types.ts +++ b/packages/db/src/types.ts @@ -253,13 +253,52 @@ export interface Subscription extends EventEmitter { readonly status: SubscriptionStatus } +/** + * Cursor expressions for pagination, passed separately from the main `where` clause. + * The sync layer can choose to use cursor-based pagination (combining these with the where) + * or offset-based pagination (ignoring these and using the `offset` parameter). + * + * Neither expression includes the main `where` clause - they are cursor-specific only. + */ +export type CursorExpressions = { + /** + * Expression for rows greater than (after) the cursor value. + * For multi-column orderBy, this is a composite cursor using OR of conditions. + * Example for [col1 ASC, col2 DESC] with values [v1, v2]: + * or(gt(col1, v1), and(eq(col1, v1), lt(col2, v2))) + */ + whereFrom: BasicExpression + /** + * Expression for rows equal to the current cursor value (first orderBy column only). + * Used to handle tie-breaking/duplicates at the boundary. + * Example: eq(col1, v1) or for Dates: and(gte(col1, v1), lt(col1, v1+1ms)) + */ + whereCurrent: BasicExpression + /** + * The key of the last item that was loaded. + * Can be used by sync layers for tracking or deduplication. + */ + lastKey: string | number +} + export type LoadSubsetOptions = { - /** The where expression to filter the data */ + /** The where expression to filter the data (does NOT include cursor expressions) */ where?: BasicExpression /** The order by clause to sort the data */ orderBy?: OrderBy /** The limit of the data to load */ limit?: number + /** + * Cursor expressions for cursor-based pagination. + * These are separate from `where` - the sync layer should combine them if using cursor-based pagination. + * Neither expression includes the main `where` clause. + */ + cursor?: CursorExpressions + /** + * Row offset for offset-based pagination. + * The sync layer can use this instead of `cursor` if it prefers offset-based pagination. + */ + offset?: number /** * The subscription that triggered the load. * Advanced sync implementations can use this for: diff --git a/packages/db/src/utils/comparison.ts b/packages/db/src/utils/comparison.ts index 1e4ead81f..f4c3a7a8b 100644 --- a/packages/db/src/utils/comparison.ts +++ b/packages/db/src/utils/comparison.ts @@ -189,3 +189,21 @@ export function areValuesEqual(a: any, b: any): boolean { // Different types or not Uint8Arrays return false } + +/** + * Compares two keys (string | number) for deterministic ordering. + * Strings come before numbers, then sorted within type. + */ +export function compareKeys( + a: TKey, + b: TKey +): number { + // Same type: compare directly + if (typeof a === typeof b) { + if (a < b) return -1 + if (a > b) return 1 + return 0 + } + // Different types: strings come before numbers + return typeof a === `string` ? -1 : 1 +} diff --git a/packages/db/tests/deterministic-ordering.test.ts b/packages/db/tests/deterministic-ordering.test.ts new file mode 100644 index 000000000..ef6409453 --- /dev/null +++ b/packages/db/tests/deterministic-ordering.test.ts @@ -0,0 +1,398 @@ +import { describe, expect, it } from "vitest" +import { SortedMap } from "../src/SortedMap" +import { BTreeIndex } from "../src/indexes/btree-index" +import { createCollection } from "../src/collection/index.js" +import { PropRef } from "../src/query/ir" +import { mockSyncCollectionOptions } from "./utils" + +/** + * These tests verify deterministic ordering behavior when values compare as equal. + * + * The issue: When multiple items have the same "sort value" (e.g., same priority), + * their relative ordering should be deterministic and stable based on their key. + * Without key-based tie-breaking, the order depends on insertion order, which + * can vary between page loads, sync operations, etc. + */ + +describe(`Deterministic Ordering`, () => { + describe(`SortedMap`, () => { + it(`should maintain deterministic order when values are equal`, () => { + // All values are the same (priority = 1), so they compare as equal + const map = new SortedMap( + (a, b) => a.priority - b.priority + ) + + // Insert in "random" order + map.set(`c`, { priority: 1 }) + map.set(`a`, { priority: 1 }) + map.set(`b`, { priority: 1 }) + + // With key-based tie-breaking, should always iterate in key order: a, b, c + const keys = Array.from(map.keys()) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should maintain deterministic order with mixed equal and different values`, () => { + const map = new SortedMap( + (a, b) => a.priority - b.priority + ) + + // Mix of equal and different priorities + map.set(`d`, { priority: 2 }) + map.set(`c`, { priority: 1 }) + map.set(`a`, { priority: 1 }) + map.set(`e`, { priority: 2 }) + map.set(`b`, { priority: 1 }) + + // Expected: priority 1 items (a, b, c) sorted by key, then priority 2 items (d, e) sorted by key + const keys = Array.from(map.keys()) + expect(keys).toEqual([`a`, `b`, `c`, `d`, `e`]) + }) + + it(`should maintain deterministic order with numeric keys`, () => { + const map = new SortedMap( + (a, b) => a.priority - b.priority + ) + + map.set(30, { priority: 1 }) + map.set(10, { priority: 1 }) + map.set(20, { priority: 1 }) + + const keys = Array.from(map.keys()) + expect(keys).toEqual([10, 20, 30]) + }) + + it(`should maintain deterministic order after updates`, () => { + const map = new SortedMap( + (a, b) => a.priority - b.priority + ) + + map.set(`c`, { priority: 1 }) + map.set(`a`, { priority: 1 }) + map.set(`b`, { priority: 1 }) + + // Update 'b' with same priority + map.set(`b`, { priority: 1 }) + + const keys = Array.from(map.keys()) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should maintain deterministic order after delete and re-insert`, () => { + const map = new SortedMap( + (a, b) => a.priority - b.priority + ) + + map.set(`c`, { priority: 1 }) + map.set(`a`, { priority: 1 }) + map.set(`b`, { priority: 1 }) + + map.delete(`b`) + map.set(`b`, { priority: 1 }) + + const keys = Array.from(map.keys()) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should use key as tie-breaker even without custom comparator`, () => { + // When no comparator is provided, all items have "equal" sort value (default behavior) + // They should still be ordered by key + const map = new SortedMap() + + map.set(`c`, { name: `Charlie` }) + map.set(`a`, { name: `Alice` }) + map.set(`b`, { name: `Bob` }) + + const keys = Array.from(map.keys()) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + }) + + describe(`BTreeIndex`, () => { + it(`should return keys in deterministic order when indexed values are equal`, () => { + const index = new BTreeIndex( + 1, + new PropRef([`priority`]), + `priority_index` + ) + + // All have same priority + index.add(`c`, { priority: 1 }) + index.add(`a`, { priority: 1 }) + index.add(`b`, { priority: 1 }) + + // take() should return keys in key-sorted order when priorities are equal + const keys = index.take(3) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should return keys in deterministic order with mixed equal and different values`, () => { + const index = new BTreeIndex( + 1, + new PropRef([`priority`]), + `priority_index` + ) + + index.add(`d`, { priority: 2 }) + index.add(`c`, { priority: 1 }) + index.add(`a`, { priority: 1 }) + index.add(`e`, { priority: 2 }) + index.add(`b`, { priority: 1 }) + + // take() should return priority 1 keys sorted by key, then priority 2 keys sorted by key + const keys = index.take(5) + expect(keys).toEqual([`a`, `b`, `c`, `d`, `e`]) + }) + + it(`should return keys in deterministic order with numeric keys`, () => { + const index = new BTreeIndex( + 1, + new PropRef([`priority`]), + `priority_index` + ) + + index.add(30, { priority: 1 }) + index.add(10, { priority: 1 }) + index.add(20, { priority: 1 }) + + const keys = index.take(3) + expect(keys).toEqual([10, 20, 30]) + }) + + it(`should return keys in deterministic order for takeReversed`, () => { + const index = new BTreeIndex( + 1, + new PropRef([`priority`]), + `priority_index` + ) + + index.add(`c`, { priority: 1 }) + index.add(`a`, { priority: 1 }) + index.add(`b`, { priority: 1 }) + + // takeReversed should return keys in reverse key order when priorities are equal + const keys = index.takeReversed(3) + expect(keys).toEqual([`c`, `b`, `a`]) + }) + + it(`should maintain deterministic order after remove and re-add`, () => { + const index = new BTreeIndex( + 1, + new PropRef([`priority`]), + `priority_index` + ) + + index.add(`c`, { priority: 1 }) + index.add(`a`, { priority: 1 }) + index.add(`b`, { priority: 1 }) + + index.remove(`b`, { priority: 1 }) + index.add(`b`, { priority: 1 }) + + const keys = index.take(3) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should return keys in deterministic order with take from cursor across different values`, () => { + const index = new BTreeIndex( + 1, + new PropRef([`priority`]), + `priority_index` + ) + + // Add keys with different priorities + index.add(`e`, { priority: 2 }) + index.add(`c`, { priority: 1 }) + index.add(`a`, { priority: 1 }) + index.add(`f`, { priority: 2 }) + index.add(`d`, { priority: 2 }) + index.add(`b`, { priority: 1 }) + + // First batch - should get priority 1 keys in key order + const firstBatch = index.take(3) + expect(firstBatch).toEqual([`a`, `b`, `c`]) + + // Continue from cursor value 1 (exclusive) - should get priority 2 keys in key order + const secondBatch = index.take(3, 1) + expect(secondBatch).toEqual([`d`, `e`, `f`]) + }) + }) + + describe(`Collection iteration`, () => { + it(`should iterate in deterministic order when compare function returns equal`, () => { + type Item = { id: string; priority: number } + + const options = mockSyncCollectionOptions({ + id: `test-collection`, + getKey: (item) => item.id, + initialData: [], + }) + + const collection = createCollection({ + ...options, + // Compare by priority only - items with same priority compare as equal + compare: (a, b) => a.priority - b.priority, + }) + + // Insert via sync in "random" order + options.utils.begin() + options.utils.write({ type: `insert`, value: { id: `c`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `a`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `b`, priority: 1 } }) + options.utils.commit() + + // Should iterate in key order when priorities are equal + const keys = [...collection.keys()] + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should iterate in deterministic order with mixed priorities`, () => { + type Item = { id: string; priority: number } + + const options = mockSyncCollectionOptions({ + id: `test-collection-mixed`, + getKey: (item) => item.id, + initialData: [], + }) + + const collection = createCollection({ + ...options, + compare: (a, b) => a.priority - b.priority, + }) + + options.utils.begin() + options.utils.write({ type: `insert`, value: { id: `d`, priority: 2 } }) + options.utils.write({ type: `insert`, value: { id: `c`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `a`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `e`, priority: 2 } }) + options.utils.write({ type: `insert`, value: { id: `b`, priority: 1 } }) + options.utils.commit() + + // Priority 1 items sorted by key, then priority 2 items sorted by key + const keys = [...collection.keys()] + expect(keys).toEqual([`a`, `b`, `c`, `d`, `e`]) + }) + + it(`should maintain deterministic order after incremental sync`, () => { + type Item = { id: string; priority: number } + + const options = mockSyncCollectionOptions({ + id: `test-collection-incremental`, + getKey: (item) => item.id, + initialData: [], + }) + + const collection = createCollection({ + ...options, + compare: (a, b) => a.priority - b.priority, + }) + + // First sync batch + options.utils.begin() + options.utils.write({ type: `insert`, value: { id: `c`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `a`, priority: 1 } }) + options.utils.commit() + + // Second sync batch (simulating incremental load) + options.utils.begin() + options.utils.write({ type: `insert`, value: { id: `b`, priority: 1 } }) + options.utils.commit() + + // Order should be deterministic regardless of sync batch order + const keys = [...collection.keys()] + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should maintain deterministic order when collection has no compare function`, () => { + // Even without a compare function, iteration order should be deterministic (by key) + type Item = { id: string; name: string } + + const options = mockSyncCollectionOptions({ + id: `test-collection-no-compare`, + getKey: (item) => item.id, + initialData: [], + }) + + const collection = createCollection(options) + + options.utils.begin() + options.utils.write({ + type: `insert`, + value: { id: `c`, name: `Charlie` }, + }) + options.utils.write({ type: `insert`, value: { id: `a`, name: `Alice` } }) + options.utils.write({ type: `insert`, value: { id: `b`, name: `Bob` } }) + options.utils.commit() + + // Without compare function, should still iterate in key order + const keys = [...collection.keys()] + expect(keys).toEqual([`a`, `b`, `c`]) + }) + }) + + describe(`Collection currentStateAsChanges with orderBy`, () => { + it(`should return changes in deterministic order when orderBy values are equal`, () => { + type Item = { id: string; priority: number } + + const options = mockSyncCollectionOptions({ + id: `test-collection-changes`, + getKey: (item) => item.id, + initialData: [], + }) + + const collection = createCollection(options) + + options.utils.begin() + options.utils.write({ type: `insert`, value: { id: `c`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `a`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `b`, priority: 1 } }) + options.utils.commit() + + const changes = collection.currentStateAsChanges({ + orderBy: [ + { + expression: new PropRef([`priority`]), + compareOptions: { direction: `asc`, nulls: `last` }, + }, + ], + }) + + const keys = changes?.map((c) => c.key) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + + it(`should return changes in deterministic order with limit`, () => { + type Item = { id: string; priority: number } + + const options = mockSyncCollectionOptions({ + id: `test-collection-changes-limit`, + getKey: (item) => item.id, + initialData: [], + }) + + const collection = createCollection(options) + + options.utils.begin() + options.utils.write({ type: `insert`, value: { id: `e`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `c`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `a`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `d`, priority: 1 } }) + options.utils.write({ type: `insert`, value: { id: `b`, priority: 1 } }) + options.utils.commit() + + const changes = collection.currentStateAsChanges({ + orderBy: [ + { + expression: new PropRef([`priority`]), + compareOptions: { direction: `asc`, nulls: `last` }, + }, + ], + limit: 3, + }) + + // First 3 in key order: a, b, c + const keys = changes?.map((c) => c.key) + expect(keys).toEqual([`a`, `b`, `c`]) + }) + }) +}) diff --git a/packages/db/tests/query/live-query-collection.test.ts b/packages/db/tests/query/live-query-collection.test.ts index ff90f6d5d..0b4222b4a 100644 --- a/packages/db/tests/query/live-query-collection.test.ts +++ b/packages/db/tests/query/live-query-collection.test.ts @@ -1117,11 +1117,13 @@ describe(`createLiveQueryCollection`, () => { expect(liveQuery.isLoadingSubset).toBe(false) }) - it(`source collection isLoadingSubset is independent`, async () => { - let resolveLoadSubset: () => void - const loadSubsetPromise = new Promise((resolve) => { - resolveLoadSubset = resolve - }) + it(`source collection isLoadingSubset is independent from live query after initial load`, async () => { + // This test verifies that AFTER the initial subscription load completes, + // direct loadSubset calls on the source collection don't affect the live query's + // isLoadingSubset state. + + let loadSubsetCallCount = 0 + const resolvers: Array<() => void> = [] const sourceCollection = createCollection<{ id: string; value: number }>({ id: `source`, @@ -1134,7 +1136,12 @@ describe(`createLiveQueryCollection`, () => { commit() markReady() return { - loadSubset: () => loadSubsetPromise, + loadSubset: () => { + loadSubsetCallCount++ + return new Promise((resolve) => { + resolvers.push(resolve) + }) + }, } }, }, @@ -1145,22 +1152,185 @@ describe(`createLiveQueryCollection`, () => { startSync: true, }) - await liveQuery.preload() + // Wait for the subscription to be set up + await flushPromises() + + // The initial load is in progress + expect(loadSubsetCallCount).toBe(1) + expect(liveQuery.isLoadingSubset).toBe(true) + + // Resolve the initial load + resolvers[0]!() + await flushPromises() - // Calling loadSubset directly on source collection sets its own isLoadingSubset + // Now the live query's initial load is complete + expect(liveQuery.isLoadingSubset).toBe(false) + expect(liveQuery.isReady()).toBe(true) + + // Calling loadSubset DIRECTLY on source collection sets its own isLoadingSubset sourceCollection._sync.loadSubset({}) + expect(loadSubsetCallCount).toBe(2) expect(sourceCollection.isLoadingSubset).toBe(true) - // But live query isLoadingSubset tracks subscription-driven loads, not direct loadSubset calls - // so it remains false unless subscriptions trigger loads via predicate pushdown + // But live query isLoadingSubset tracks subscription-driven loads, not direct calls + // so it remains false (the second loadSubset was not via the live query subscription) expect(liveQuery.isLoadingSubset).toBe(false) - resolveLoadSubset!() - await new Promise((resolve) => setTimeout(resolve, 10)) + // Resolve the direct call + resolvers[1]!() + await flushPromises() expect(sourceCollection.isLoadingSubset).toBe(false) expect(liveQuery.isLoadingSubset).toBe(false) }) + + it(`live query should not be ready until first loadSubset completes for on-demand sync`, async () => { + // This test verifies that when using on-demand sync mode, the live query + // collection stays in 'loading' status until the first loadSubset completes, + // rather than immediately becoming 'ready' when the source collection is ready. + + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve + }) + + const sourceCollection = createCollection<{ id: number; value: number }>({ + id: `on-demand-ready-test`, + getKey: (item) => item.id, + syncMode: `on-demand`, + startSync: true, + sync: { + sync: ({ markReady, begin, write, commit }) => { + // For on-demand sync, markReady is called immediately + // but no data is loaded yet + markReady() + + return { + loadSubset: () => { + // Return a promise that simulates async data loading + return loadSubsetPromise.then(() => { + begin() + write({ type: `insert`, value: { id: 1, value: 100 } }) + write({ type: `insert`, value: { id: 2, value: 200 } }) + commit() + }) + }, + } + }, + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => q.from({ item: sourceCollection }), + startSync: true, + }) + + // Wait a tick for the subscription to be set up and loadSubset to be called + await flushPromises() + + // The source collection is ready, but the live query should NOT be ready yet + // because the first loadSubset is still in progress + expect(sourceCollection.isReady()).toBe(true) + expect(liveQuery.status).toBe(`loading`) + expect(liveQuery.isReady()).toBe(false) + expect(liveQuery.isLoadingSubset).toBe(true) + + // Now resolve the loadSubset promise + resolveLoadSubset!() + await flushPromises() + + // Now the live query should be ready with data + expect(liveQuery.status).toBe(`ready`) + expect(liveQuery.isReady()).toBe(true) + expect(liveQuery.isLoadingSubset).toBe(false) + expect(liveQuery.size).toBe(2) + }) + + it(`subsequent loadSubset calls should not affect live query ready status`, async () => { + // This test verifies that after the first loadSubset completes, + // subsequent loadSubset calls (e.g., from windowing) do NOT change + // the live query's ready status back to loading. + + let loadSubsetCount = 0 + let resolveLoadSubset: () => void + + const sourceCollection = createCollection<{ id: number; value: number }>({ + id: `subsequent-loadsubset-test`, + getKey: (item) => item.id, + syncMode: `on-demand`, + startSync: true, + sync: { + sync: ({ markReady, begin, write, commit }) => { + markReady() + + return { + loadSubset: () => { + loadSubsetCount++ + const promise = new Promise((resolve) => { + resolveLoadSubset = resolve + }) + + return promise.then(() => { + begin() + // Add more items for each loadSubset call + const baseId = (loadSubsetCount - 1) * 2 + write({ + type: `insert`, + value: { id: baseId + 1, value: baseId + 1 }, + }) + write({ + type: `insert`, + value: { id: baseId + 2, value: baseId + 2 }, + }) + commit() + }) + }, + } + }, + }, + }) + + const liveQuery = createLiveQueryCollection({ + query: (q) => + q + .from({ item: sourceCollection }) + .orderBy(({ item }) => item.value, `asc`) + .limit(2) + .offset(0), + startSync: true, + }) + + await flushPromises() + + // First loadSubset is in progress + expect(liveQuery.status).toBe(`loading`) + expect(loadSubsetCount).toBe(1) + + // Complete the first loadSubset + resolveLoadSubset!() + await flushPromises() + + // Now live query should be ready + expect(liveQuery.status).toBe(`ready`) + expect(liveQuery.isReady()).toBe(true) + + // Trigger a second loadSubset by changing the window + liveQuery.utils.setWindow({ offset: 2, limit: 2 }) + await flushPromises() + + // Even though a second loadSubset is in progress, status should stay 'ready' + expect(loadSubsetCount).toBeGreaterThan(1) + expect(liveQuery.status).toBe(`ready`) + expect(liveQuery.isReady()).toBe(true) + + // Complete the second loadSubset + resolveLoadSubset!() + await flushPromises() + + // Status should still be ready + expect(liveQuery.status).toBe(`ready`) + expect(liveQuery.isReady()).toBe(true) + }) }) describe(`move functionality`, () => { @@ -2041,5 +2211,121 @@ describe(`createLiveQueryCollection`, () => { resolveLoadSubset!() await flushPromises() }) + + it(`passes single orderBy clause to loadSubset when using limit`, async () => { + const capturedOptions: Array = [] + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve + }) + + const baseCollection = createCollection<{ + id: number + name: string + age: number + }>({ + id: `test-base-orderby`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ markReady }) => { + markReady() + return { + loadSubset: (options: LoadSubsetOptions) => { + capturedOptions.push(options) + return loadSubsetPromise + }, + } + }, + }, + }) + + // Create a live query collection with orderBy and limit + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ item: baseCollection }) + .orderBy(({ item }) => item.age, `asc`) + .limit(10) + ) + + // Trigger sync which will call loadSubset + await liveQueryCollection.preload() + await flushPromises() + + expect(capturedOptions.length).toBeGreaterThan(0) + + // Find the call that has orderBy (the limited snapshot request) + const callWithOrderBy = capturedOptions.find( + (opt) => opt.orderBy !== undefined + ) + expect(callWithOrderBy).toBeDefined() + expect(callWithOrderBy?.orderBy).toHaveLength(1) + expect(callWithOrderBy?.orderBy?.[0]?.expression.type).toBe(`ref`) + expect(callWithOrderBy?.limit).toBe(10) + + resolveLoadSubset!() + await flushPromises() + }) + + it(`passes multiple orderBy columns to loadSubset when using limit`, async () => { + const capturedOptions: Array = [] + let resolveLoadSubset: () => void + const loadSubsetPromise = new Promise((resolve) => { + resolveLoadSubset = resolve + }) + + const baseCollection = createCollection<{ + id: number + name: string + age: number + department: string + }>({ + id: `test-base-multi-orderby`, + getKey: (item) => item.id, + syncMode: `on-demand`, + sync: { + sync: ({ markReady }) => { + markReady() + return { + loadSubset: (options: LoadSubsetOptions) => { + capturedOptions.push(options) + return loadSubsetPromise + }, + } + }, + }, + }) + + // Create a live query collection with multiple orderBy columns and limit + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ item: baseCollection }) + .orderBy(({ item }) => item.department, `asc`) + .orderBy(({ item }) => item.age, `desc`) + .limit(10) + ) + + // Trigger sync which will call loadSubset + await liveQueryCollection.preload() + await flushPromises() + + expect(capturedOptions.length).toBeGreaterThan(0) + + // Find the call that has orderBy with multiple columns + const callWithMultiOrderBy = capturedOptions.find( + (opt) => opt.orderBy !== undefined && opt.orderBy.length > 1 + ) + + // Multi-column orderBy should be passed to loadSubset so the sync layer + // can optimize the query if the backend supports composite ordering + expect(callWithMultiOrderBy).toBeDefined() + expect(callWithMultiOrderBy?.orderBy).toHaveLength(2) + expect(callWithMultiOrderBy?.orderBy?.[0]?.expression.type).toBe(`ref`) + expect(callWithMultiOrderBy?.orderBy?.[1]?.expression.type).toBe(`ref`) + expect(callWithMultiOrderBy?.limit).toBe(10) + + resolveLoadSubset!() + await flushPromises() + }) }) }) diff --git a/packages/db/tests/query/order-by.test.ts b/packages/db/tests/query/order-by.test.ts index d7c11b848..203137a55 100644 --- a/packages/db/tests/query/order-by.test.ts +++ b/packages/db/tests/query/order-by.test.ts @@ -2568,6 +2568,7 @@ describe(`OrderBy with duplicate values`, () => { it(`should correctly advance window when there are duplicate values loaded from sync layer`, async () => { // Create test data that reproduces the specific bug described: // Items with many duplicates at value 5, then normal progression + // Note: loadSubset now receives cursor expressions (whereFrom/whereCurrent) separately from where const allTestData: Array = [ { id: 1, a: 1, keep: true }, { id: 2, a: 2, keep: true }, @@ -2640,11 +2641,56 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return a slice from 0 to limit + // Apply cursor expressions if present (cursor-based pagination) + // For proper cursor-based pagination: + // - whereCurrent should load ALL ties (no limit) + // - whereFrom should load with remaining limit + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + const { limit } = options + try { + // Get ALL rows matching whereCurrent (no limit for ties) + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = + filteredData.filter(whereCurrentFn) + + // Get rows matching whereFrom with limit (for next page data) + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + const limitedFromData = limit + ? fromData.slice(0, limit) + : fromData + + // Combine: current rows + from rows (deduplicated) + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + // Re-sort after combining + filteredData.sort((a, b) => a.a - b.a) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + + // Return data (limit already applied when cursor is present) const { limit } = options - const dataToLoad = limit - ? filteredData.slice(0, limit) - : filteredData + const dataToLoad = + limit && !options.cursor + ? filteredData.slice(0, limit) + : filteredData dataToLoad.forEach((item) => { write({ @@ -2710,8 +2756,8 @@ describe(`OrderBy with duplicate values`, () => { { id: 9, a: 5, keep: true }, { id: 10, a: 5, keep: true }, ]) - // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) - expect(loadSubsetCallCount).toBe(3) + // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) + expect(loadSubsetCallCount).toBe(2) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s @@ -2739,12 +2785,13 @@ describe(`OrderBy with duplicate values`, () => { // We expect no more loadSubset calls because when we loaded the previous page // we asked for all data equal to max value and LIMIT values greater than max value // and the LIMIT values greater than max value already loaded the next page - expect(loadSubsetCallCount).toBe(3) + expect(loadSubsetCallCount).toBe(2) }) it(`should correctly advance window when there are duplicate values loaded from both local collection and sync layer`, async () => { // Create test data that reproduces the specific bug described: // Items with many duplicates at value 5, then normal progression + // Note: loadSubset now receives cursor expressions (whereFrom/whereCurrent) separately from where const allTestData: Array = [ { id: 1, a: 1, keep: true }, { id: 2, a: 2, keep: true }, @@ -2764,7 +2811,7 @@ describe(`OrderBy with duplicate values`, () => { { id: 16, a: 16, keep: true }, ] - // Start with only the first 5 items in the local collection + // Start with the first 10 items in the local collection (includes all duplicates) const initialData = allTestData.slice(0, 10) let loadSubsetCallCount = 0 @@ -2817,11 +2864,56 @@ describe(`OrderBy with duplicate values`, () => { } } - // Return a slice from 0 to limit + // Apply cursor expressions if present (cursor-based pagination) + // For proper cursor-based pagination: + // - whereCurrent should load ALL ties (no limit) + // - whereFrom should load with remaining limit + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + const { limit } = options + try { + // Get ALL rows matching whereCurrent (no limit for ties) + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = + filteredData.filter(whereCurrentFn) + + // Get rows matching whereFrom with limit (for next page data) + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + const limitedFromData = limit + ? fromData.slice(0, limit) + : fromData + + // Combine: current rows + from rows (deduplicated) + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + // Re-sort after combining + filteredData.sort((a, b) => a.a - b.a) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + + // Return data (limit already applied when cursor is present) const { limit } = options - const dataToLoad = limit - ? filteredData.slice(0, limit) - : filteredData + const dataToLoad = + limit && !options.cursor + ? filteredData.slice(0, limit) + : filteredData dataToLoad.forEach((item) => { write({ @@ -2887,8 +2979,8 @@ describe(`OrderBy with duplicate values`, () => { { id: 9, a: 5, keep: true }, { id: 10, a: 5, keep: true }, ]) - // we expect 2 new loadSubset calls (1 for data equal to max value and one for data greater than max value) - expect(loadSubsetCallCount).toBe(3) + // we expect 1 new loadSubset call (cursor expressions for whereFrom/whereCurrent are now combined in single call) + expect(loadSubsetCallCount).toBe(2) // Now move to third page (offset 10, limit 5) // It should advance past the duplicate 5s @@ -2916,7 +3008,7 @@ describe(`OrderBy with duplicate values`, () => { // We expect no more loadSubset calls because when we loaded the previous page // we asked for all data equal to max value and LIMIT values greater than max value // and the LIMIT values greater than max value already loaded the next page - expect(loadSubsetCallCount).toBe(3) + expect(loadSubsetCallCount).toBe(2) }) }) } @@ -2960,8 +3052,9 @@ describe(`OrderBy with Date values and precision differences`, () => { const initialData = testData.slice(0, 5) - // Track the WHERE clauses sent to loadSubset - const loadSubsetWhereClauses: Array = [] + // Track the cursor expressions sent to loadSubset + // Note: cursor expressions are now passed separately from where (whereFrom/whereCurrent/lastKey) + const loadSubsetCursors: Array = [] const sourceCollection = createCollection( mockSyncCollectionOptions({ @@ -2981,8 +3074,8 @@ describe(`OrderBy with Date values and precision differences`, () => { return { loadSubset: (options) => { - // Capture the WHERE clause for inspection - loadSubsetWhereClauses.push(options.where) + // Capture the cursor for inspection (now contains whereFrom/whereCurrent/lastKey) + loadSubsetCursors.push(options.cursor) return new Promise((resolve) => { setTimeout(() => { @@ -3003,6 +3096,42 @@ describe(`OrderBy with Date values and precision differences`, () => { } } + // Apply cursor expressions if present + if (options.cursor) { + const { whereFrom, whereCurrent } = options.cursor + try { + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filteredData.filter(whereFromFn) + + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = filteredData.filter(whereCurrentFn) + + // Combine and deduplicate + const seenIds = new Set() + filteredData = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + for (const item of fromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filteredData.push(item) + } + } + filteredData.sort( + (a, b) => + a.createdAt.getTime() - b.createdAt.getTime() + ) + } catch (error) { + console.log(`Error applying cursor:`, error) + } + } + const { limit } = options const dataToLoad = limit ? filteredData.slice(0, limit) @@ -3042,21 +3171,22 @@ describe(`OrderBy with Date values and precision differences`, () => { const results = Array.from(collection.values()).sort((a, b) => a.id - b.id) expect(results.map((r) => r.id)).toEqual([1, 2, 3, 4, 5]) - // Clear tracked clauses before moving to next page - loadSubsetWhereClauses.length = 0 + // Clear tracked cursors before moving to next page + loadSubsetCursors.length = 0 // Move to next page - this should trigger the Date precision handling const moveToSecondPage = collection.utils.setWindow({ offset: 5, limit: 5 }) await moveToSecondPage - // Find the WHERE clause that queries for the "equal values" (the minValue query) - // With the fix, this should be: and(gte(createdAt, baseTime), lt(createdAt, baseTime+1ms)) + // Find the cursor that contains the "whereCurrent" expression (the minValue query) + // With the fix, whereCurrent should be: and(gte(createdAt, baseTime), lt(createdAt, baseTime+1ms)) // Without the fix, this would be: eq(createdAt, baseTime) - const equalValuesQuery = loadSubsetWhereClauses.find((clause) => { - if (!clause) return false - // Check if it's an 'and' with 'gte' and 'lt' (the fix) - if (clause.name === `and` && clause.args?.length === 2) { - const [first, second] = clause.args + const cursorWithDateRange = loadSubsetCursors.find((cursor) => { + if (!cursor?.whereCurrent) return false + const whereCurrent = cursor.whereCurrent + // Check if whereCurrent is an 'and' with 'gte' and 'lt' (the fix) + if (whereCurrent.name === `and` && whereCurrent.args?.length === 2) { + const [first, second] = whereCurrent.args return first?.name === `gte` && second?.name === `lt` } return false @@ -3064,7 +3194,8 @@ describe(`OrderBy with Date values and precision differences`, () => { // The fix should produce a range query (and(gte, lt)) for Date values // instead of an exact equality query (eq) - expect(equalValuesQuery).toBeDefined() + expect(cursorWithDateRange).toBeDefined() + const equalValuesQuery = cursorWithDateRange.whereCurrent expect(equalValuesQuery.name).toBe(`and`) expect(equalValuesQuery.args[0].name).toBe(`gte`) expect(equalValuesQuery.args[1].name).toBe(`lt`) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 1ec043fd2..7a9fd1e92 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -6,7 +6,7 @@ import { } from "@electric-sql/client" import { Store } from "@tanstack/store" import DebugModule from "debug" -import { DeduplicatedLoadSubset } from "@tanstack/db" +import { DeduplicatedLoadSubset, and, or } from "@tanstack/db" import { ExpectedNumberInAwaitTxIdError, StreamAbortedError, @@ -307,7 +307,12 @@ function hasTxids>( * Creates a deduplicated loadSubset handler for progressive/on-demand modes * Returns null for eager mode, or a DeduplicatedLoadSubset instance for other modes. * Handles fetching snapshots in progressive mode during buffering phase, - * and requesting snapshots in on-demand mode + * and requesting snapshots in on-demand mode. + * + * When cursor expressions are provided (whereFrom/whereCurrent), makes two + * requestSnapshot calls: + * - One for whereFrom (rows > cursor) with limit + * - One for whereCurrent (rows = cursor, for tie-breaking) without limit */ function createLoadSubsetDedupe>({ stream, @@ -382,8 +387,35 @@ function createLoadSubsetDedupe>({ return } else { // On-demand mode: use requestSnapshot - const snapshotParams = compileSQL(opts) - await stream.requestSnapshot(snapshotParams) + // When cursor is provided, make two calls: + // 1. whereCurrent (all ties, no limit) + // 2. whereFrom (rows > cursor, with limit) + const { cursor, where, orderBy, limit } = opts + + if (cursor) { + // Combine whereFrom and whereCurrent into a single request using OR + // This gets: (rows > cursor) OR (rows = cursor for ties) + // Using a single request avoids potential issues with multiple sequential snapshots + const combinedCursor = or(cursor.whereFrom, cursor.whereCurrent) + const cursorOpts: LoadSubsetOptions = { + where: where ? and(where, combinedCursor) : combinedCursor, + orderBy, + // Note: limit applies to combined result, which may include ties + // This matches the original behavior where cursor was combined with where + limit, + } + const cursorParams = compileSQL(cursorOpts) + + debug( + `${collectionId ? `[${collectionId}] ` : ``}Requesting cursor snapshot (whereFrom OR whereCurrent, limit ${limit})` + ) + + await stream.requestSnapshot(cursorParams) + } else { + // No cursor - standard single request + const snapshotParams = compileSQL(opts) + await stream.requestSnapshot(snapshotParams) + } } } diff --git a/packages/electric-db-collection/src/sql-compiler.ts b/packages/electric-db-collection/src/sql-compiler.ts index c941c7fef..c7a728895 100644 --- a/packages/electric-db-collection/src/sql-compiler.ts +++ b/packages/electric-db-collection/src/sql-compiler.ts @@ -172,6 +172,120 @@ function compileFunction( throw new Error(`Binary operator ${name} expects 2 arguments`) } const [lhs, rhs] = compiledArgs + + // Special case for comparison operators with boolean values + // PostgreSQL doesn't support < > <= >= on booleans + // Transform to equivalent equality checks or constant expressions + if (isComparisonOp(name)) { + const lhsArg = args[0] + const rhsArg = args[1] + + // Check if RHS is a boolean literal value + if ( + rhsArg && + rhsArg.type === `val` && + typeof rhsArg.value === `boolean` + ) { + const boolValue = rhsArg.value + // Remove the boolean param we just added since we'll transform the expression + params.pop() + + // Transform based on operator and boolean value + // Boolean ordering: false < true + if (name === `lt`) { + if (boolValue === true) { + // lt(col, true) → col = false (only false is less than true) + params.push(false) + return `${lhs} = $${params.length}` + } else { + // lt(col, false) → nothing is less than false + return `false` + } + } else if (name === `gt`) { + if (boolValue === false) { + // gt(col, false) → col = true (only true is greater than false) + params.push(true) + return `${lhs} = $${params.length}` + } else { + // gt(col, true) → nothing is greater than true + return `false` + } + } else if (name === `lte`) { + if (boolValue === true) { + // lte(col, true) → everything is ≤ true + return `true` + } else { + // lte(col, false) → col = false + params.push(false) + return `${lhs} = $${params.length}` + } + } else if (name === `gte`) { + if (boolValue === false) { + // gte(col, false) → everything is ≥ false + return `true` + } else { + // gte(col, true) → col = true + params.push(true) + return `${lhs} = $${params.length}` + } + } + } + + // Check if LHS is a boolean literal value (less common but handle it) + if ( + lhsArg && + lhsArg.type === `val` && + typeof lhsArg.value === `boolean` + ) { + const boolValue = lhsArg.value + // Remove params for this expression and rebuild + params.pop() // remove RHS + params.pop() // remove LHS (boolean) + + // Recompile RHS to get fresh param + const rhsCompiled = compileBasicExpression(rhsArg!, params) + + // Transform: flip the comparison (val op col → col flipped_op val) + if (name === `lt`) { + // lt(true, col) → gt(col, true) → col > true → nothing is greater than true + if (boolValue === true) { + return `false` + } else { + // lt(false, col) → gt(col, false) → col = true + params.push(true) + return `${rhsCompiled} = $${params.length}` + } + } else if (name === `gt`) { + // gt(true, col) → lt(col, true) → col = false + if (boolValue === true) { + params.push(false) + return `${rhsCompiled} = $${params.length}` + } else { + // gt(false, col) → lt(col, false) → nothing is less than false + return `false` + } + } else if (name === `lte`) { + if (boolValue === false) { + // lte(false, col) → gte(col, false) → everything + return `true` + } else { + // lte(true, col) → gte(col, true) → col = true + params.push(true) + return `${rhsCompiled} = $${params.length}` + } + } else if (name === `gte`) { + if (boolValue === true) { + // gte(true, col) → lte(col, true) → everything + return `true` + } else { + // gte(false, col) → lte(col, false) → col = false + params.push(false) + return `${rhsCompiled} = $${params.length}` + } + } + } + } + // Special case for = ANY operator which needs parentheses around the array parameter if (name === `in`) { return `${lhs} ${opName}(${rhs})` @@ -198,6 +312,14 @@ function isBinaryOp(name: string): boolean { return binaryOps.includes(name) } +/** + * Checks if the operator is a comparison operator (excluding eq) + * These operators don't work on booleans in PostgreSQL without casting + */ +function isComparisonOp(name: string): boolean { + return [`gt`, `gte`, `lt`, `lte`].includes(name) +} + function getOpName(name: string): string { const opNames = { eq: `=`, diff --git a/packages/electric-db-collection/tests/electric-live-query.test.ts b/packages/electric-db-collection/tests/electric-live-query.test.ts index ece48b667..4c0c1d1e0 100644 --- a/packages/electric-db-collection/tests/electric-live-query.test.ts +++ b/packages/electric-db-collection/tests/electric-live-query.test.ts @@ -544,6 +544,9 @@ describe.each([ .limit(2), }) + // Wait for the initial loadSubset to complete + await new Promise((resolve) => setTimeout(resolve, 0)) + expect(limitedLiveQuery.status).toBe(`ready`) expect(limitedLiveQuery.size).toBe(2) // Only first 2 active users expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) @@ -609,9 +612,8 @@ describe.each([ // Limited queries are only deduplicated when their where clauses are equal. // Both queries have the same where clause (active = true), but the second query // with limit 6 needs more data than the first query with limit 2 provided. - // The internal query system makes additional requests as it processes the data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(6) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call each. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) // Check that first it requested a limit of 2 users (from first query) expect(callArgs(0)).toMatchObject({ @@ -877,9 +879,8 @@ describe(`Electric Collection with Live Query - syncMode integration`, () => { ) // For limited queries, only requests with identical where clauses can be deduplicated. - // The internal query system may make additional requests as it processes the data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) }) it(`should pass correct WHERE clause to requestSnapshot when live query has filters`, async () => { @@ -1189,9 +1190,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // For limited queries, only requests with identical where clauses can be deduplicated. - // The internal query system may make additional requests as it processes data. - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) + // With cursor-based pagination, initial loads (without cursor) make 1 requestSnapshot call. + expect(mockRequestSnapshot).toHaveBeenCalledTimes(1) // Simulate a must-refetch (which triggers truncate and reset) subscriber([{ headers: { control: `must-refetch` } }]) @@ -1201,8 +1201,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // The existing live query re-requests its data after truncate - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(5) + // After must-refetch, the query requests data again (1 initial + 1 after truncate) + expect(mockRequestSnapshot).toHaveBeenCalledTimes(2) // Create the same live query again after reset // This should NOT be deduped because the reset cleared the deduplication state, @@ -1221,8 +1221,8 @@ describe(`Electric Collection - loadSubset deduplication`, () => { await new Promise((resolve) => setTimeout(resolve, 0)) // Should have more calls - the different query triggered a new request - // TODO: Once we have cursor based pagination with the PK as a tiebreaker, we can reduce this. - expect(mockRequestSnapshot).toHaveBeenCalledTimes(6) + // 1 initial + 1 after must-refetch + 1 for new query = 3 + expect(mockRequestSnapshot).toHaveBeenCalledTimes(3) }) it(`should deduplicate unlimited queries regardless of orderBy`, async () => { diff --git a/packages/powersync-db-collection/tests/powersync.test.ts b/packages/powersync-db-collection/tests/powersync.test.ts index 5d71af023..e2e51d236 100644 --- a/packages/powersync-db-collection/tests/powersync.test.ts +++ b/packages/powersync-db-collection/tests/powersync.test.ts @@ -81,10 +81,11 @@ describe(`PowerSync Integration`, () => { // Verify the collection state contains our items expect(collection.size).toBe(3) - expect(collection.toArray.map((entry) => entry.name)).deep.equals([ + // Sort by name since keys are random UUIDs + expect(collection.toArray.map((entry) => entry.name).sort()).deep.equals([ `one`, - `two`, `three`, + `two`, ]) }) @@ -110,12 +111,10 @@ describe(`PowerSync Integration`, () => { await vi.waitFor( () => { expect(collection.size).toBe(4) - expect(collection.toArray.map((entry) => entry.name)).deep.equals([ - `one`, - `two`, - `three`, - `four`, - ]) + // Sort by name since keys are random UUIDs + expect( + collection.toArray.map((entry) => entry.name).sort() + ).deep.equals([`four`, `one`, `three`, `two`]) }, { timeout: 1000 } ) @@ -129,11 +128,10 @@ describe(`PowerSync Integration`, () => { await vi.waitFor( () => { expect(collection.size).toBe(3) - expect(collection.toArray.map((entry) => entry.name)).deep.equals([ - `one`, - `three`, - `four`, - ]) + // Sort by name since keys are random UUIDs + expect( + collection.toArray.map((entry) => entry.name).sort() + ).deep.equals([`four`, `one`, `three`]) }, { timeout: 1000 } ) @@ -148,11 +146,10 @@ describe(`PowerSync Integration`, () => { await vi.waitFor( () => { expect(collection.size).toBe(3) - expect(collection.toArray.map((entry) => entry.name)).deep.equals([ - `updated`, - `three`, - `four`, - ]) + // Sort by name since keys are random UUIDs + expect( + collection.toArray.map((entry) => entry.name).sort() + ).deep.equals([`four`, `three`, `updated`]) }, { timeout: 1000 } ) diff --git a/packages/query-db-collection/src/serialization.ts b/packages/query-db-collection/src/serialization.ts index ba22dc75a..cd54899f2 100644 --- a/packages/query-db-collection/src/serialization.ts +++ b/packages/query-db-collection/src/serialization.ts @@ -1,7 +1,9 @@ import type { IR, LoadSubsetOptions } from "@tanstack/db" /** - * Serializes LoadSubsetOptions into a stable, hashable format for query keys + * Serializes LoadSubsetOptions into a stable, hashable format for query keys. + * Includes where, orderBy, limit, and offset for pagination support. + * Note: cursor expressions are not serialized as they are backend-specific. * @internal */ export function serializeLoadSubsetOptions( @@ -43,6 +45,11 @@ export function serializeLoadSubsetOptions( result.limit = options.limit } + // Include offset for pagination support + if (options.offset !== undefined) { + result.offset = options.offset + } + return Object.keys(result).length === 0 ? undefined : JSON.stringify(result) } diff --git a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx index 5dbb69c1b..8aea15322 100644 --- a/packages/react-db/tests/useLiveInfiniteQuery.test.tsx +++ b/packages/react-db/tests/useLiveInfiniteQuery.test.tsx @@ -898,8 +898,44 @@ describe(`useLiveInfiniteQuery`, () => { }) } - // Apply limit if provided - if (opts.limit !== undefined) { + // Apply cursor expressions if present (new cursor-based pagination) + if (opts.cursor) { + const { whereFrom, whereCurrent } = opts.cursor + try { + const whereFromFn = + createFilterFunctionFromExpression(whereFrom) + const fromData = filtered.filter(whereFromFn) + + const whereCurrentFn = + createFilterFunctionFromExpression(whereCurrent) + const currentData = filtered.filter(whereCurrentFn) + + // Combine current (ties) with from (next page), deduplicate + const seenIds = new Set() + filtered = [] + for (const item of currentData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filtered.push(item) + } + } + // Apply limit only to fromData + const limitedFromData = opts.limit + ? fromData.slice(0, opts.limit) + : fromData + for (const item of limitedFromData) { + if (!seenIds.has(item.id)) { + seenIds.add(item.id) + filtered.push(item) + } + } + // Re-sort after combining + filtered.sort((a, b) => b.createdAt - a.createdAt) + } catch { + // Fallback to original filtered if cursor parsing fails + } + } else if (opts.limit !== undefined) { + // Apply limit only if no cursor (cursor handles limit internally) filtered = filtered.slice(0, opts.limit) } diff --git a/packages/react-db/tests/useLiveQuery.test.tsx b/packages/react-db/tests/useLiveQuery.test.tsx index fb7ab5dbc..5164aa48e 100644 --- a/packages/react-db/tests/useLiveQuery.test.tsx +++ b/packages/react-db/tests/useLiveQuery.test.tsx @@ -2246,17 +2246,18 @@ describe(`Query Collections`, () => { expect(result.current.collection).toBeDefined() expect(result.current.status).toBeDefined() + // Results are in deterministic key order (id: 1 before id: 2) expect(result.current.data).toMatchObject([ - { - id: `2`, - name: `Jane Doe`, - age: 25, - }, { id: `1`, name: `John Doe`, age: 30, }, + { + id: `2`, + name: `Jane Doe`, + age: 25, + }, ]) // Switch to using pre-created collection