Skip to content

Commit 46450e7

Browse files
KyleAMathewsclaudeautofix-ci[bot]
authored
Fix isWhereSubset to handle AND subset with OR superset (#1275)
* fix(db): fix isNull predicate causing LiveQuery to never become ready when offline The `isWhereSubsetInternal` function had a critical ordering issue in its predicate checking logic. When a query with `and(eq(...), isNull(...))` was checked against a union superset like `or(and(eq(...), isNull(...)), ...)`, the AND decomposition ran before the OR superset check. This caused the function to decompose the AND into individual conjuncts (eq and isNull), neither of which could independently prove subset of the OR expression, even though the full AND expression was structurally equal to one of the OR disjuncts. The fix reorders the checks so that: 1. superset AND is handled first (unchanged) 2. subset OR is handled next (moved up from after AND) 3. superset OR is handled next (moved up from after IN) 4. subset AND decomposition happens last This ensures that when subset=AND and superset=OR, the full AND expression is checked against each OR disjunct (where structural equality catches it), before being decomposed into individual conjuncts that lose the ability to match compound disjuncts. Reported scenario: on-demand sync with `isNull(soft_deleted_at)` in a where clause caused `isLoadingSubset` to stay true after going offline and remounting, because the DeduplicatedLoadSubset couldn't recognize that data for the predicate was already loaded. https://claude.ai/code/session_01TmGem9Pj1NnXfaWY5FZJdb * ci: apply automated fixes * refactor: improve error handling and test coverage for review feedback Separate forceDisconnectAndRefresh into its own try-catch with correct error attribution, add negative guard test for isUpToDate check, add rejection path test, trim verbose comments, and clarify wording. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: add changeset for isNull predicate fix Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: handle forceDisconnectAndRefresh PauseLock error gracefully When loadSubset is called during subscriber message processing (e.g., join pipelines via D2.step), the stream's PauseLock is held and forceDisconnectAndRefresh deadlocks. Since the refresh is only an optimization to break out of long-poll waits, log the error and fall through to requestSnapshot which still works. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 59958af commit 46450e7

5 files changed

Lines changed: 173 additions & 17 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@tanstack/db': patch
3+
'@tanstack/electric-db-collection': patch
4+
---
5+
6+
Fix isNull predicate causing LiveQuery to never become ready when offline. Reorder predicate checks in `isWhereSubsetInternal` so OR superset handling runs before AND subset decomposition, allowing `and(eq, isNull)` to match structurally equal disjuncts. Also separate `forceDisconnectAndRefresh` error handling into its own try-catch with correct error attribution.

packages/db/src/query/predicate-utils.ts

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,24 @@ function isWhereSubsetInternal(
8787
)
8888
}
8989

90+
// Handle OR in subset: (A OR B) ⊆ C only if both A ⊆ C and B ⊆ C.
91+
// Must be checked before OR superset so that or(A, B) ⊆ or(C, D)
92+
// decomposes the subset first: A ⊆ or(C, D) AND B ⊆ or(C, D).
93+
if (subset.type === `func` && subset.name === `or`) {
94+
return subset.args.every((arg) =>
95+
isWhereSubsetInternal(arg as BasicExpression<boolean>, superset),
96+
)
97+
}
98+
99+
// Handle OR in superset: subset ⊆ (A OR B) if subset ⊆ A or subset ⊆ B.
100+
// Must be checked before decomposing AND subsets so that and(A, B) can
101+
// match a structurally equal disjunct via areExpressionsEqual.
102+
if (superset.type === `func` && superset.name === `or`) {
103+
return superset.args.some((arg) =>
104+
isWhereSubsetInternal(subset, arg as BasicExpression<boolean>),
105+
)
106+
}
107+
90108
// Handle subset being an AND: (A AND B) implies both A and B
91109
if (subset.type === `func` && subset.name === `and`) {
92110
// For (A AND B) ⊆ C, since (A AND B) implies A, we check if any conjunct implies C
@@ -111,22 +129,6 @@ function isWhereSubsetInternal(
111129
}
112130
}
113131

114-
// Handle OR in subset: (A OR B) is subset of C only if both A and B are subsets of C
115-
if (subset.type === `func` && subset.name === `or`) {
116-
return subset.args.every((arg) =>
117-
isWhereSubsetInternal(arg as BasicExpression<boolean>, superset),
118-
)
119-
}
120-
121-
// Handle OR in superset: subset ⊆ (A OR B) if subset ⊆ A or subset ⊆ B
122-
// (A OR B) as superset means data can satisfy A or B
123-
// If subset is contained in any disjunct, it's contained in the union
124-
if (superset.type === `func` && superset.name === `or`) {
125-
return superset.args.some((arg) =>
126-
isWhereSubsetInternal(subset, arg as BasicExpression<boolean>),
127-
)
128-
}
129-
130132
// Handle comparison operators on the same field
131133
if (subset.type === `func` && superset.type === `func`) {
132134
const subsetFunc = subset as Func

packages/db/tests/query/predicate-utils.test.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,71 @@ describe(`isWhereSubset`, () => {
345345
})
346346
})
347347

348+
describe(`AND subset with OR superset`, () => {
349+
it(`should recognize and(eq, isNull) as subset of or(and(eq, isNull), and(eq, isNull))`, () => {
350+
const projectX = `4e164373-31b4-4b42-95c9-9c395cfb4916`
351+
const projectY = `2fd4c147-2547-4b02-9554-9cd067187409`
352+
353+
const queryX = and(
354+
eq(ref(`project_id`), val(projectX)),
355+
func(`isNull`, ref(`soft_deleted_at`)),
356+
)
357+
const queryY = and(
358+
eq(ref(`project_id`), val(projectY)),
359+
func(`isNull`, ref(`soft_deleted_at`)),
360+
)
361+
362+
const unionPredicate = or(queryX, queryY)
363+
364+
expect(isWhereSubset(queryX, unionPredicate)).toBe(true)
365+
expect(isWhereSubset(queryY, unionPredicate)).toBe(true)
366+
})
367+
368+
it(`should recognize and(A, B) as subset of or(and(A, B), and(C, D))`, () => {
369+
const subsetExpr = and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20)))
370+
const supersetExpr = or(
371+
and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20))),
372+
and(eq(ref(`id`), val(2)), gt(ref(`age`), val(30))),
373+
)
374+
expect(isWhereSubset(subsetExpr, supersetExpr)).toBe(true)
375+
})
376+
377+
it(`should return false when and(A, B) matches no disjunct`, () => {
378+
const subsetExpr = and(eq(ref(`id`), val(3)), gt(ref(`age`), val(20)))
379+
const supersetExpr = or(
380+
and(eq(ref(`id`), val(1)), gt(ref(`age`), val(20))),
381+
and(eq(ref(`id`), val(2)), gt(ref(`age`), val(30))),
382+
)
383+
expect(isWhereSubset(subsetExpr, supersetExpr)).toBe(false)
384+
})
385+
})
386+
387+
describe(`isNull predicates`, () => {
388+
it(`should return true for identical isNull expressions`, () => {
389+
const a = func(`isNull`, ref(`deleted_at`))
390+
const b = func(`isNull`, ref(`deleted_at`))
391+
expect(isWhereSubset(a, b)).toBe(true)
392+
})
393+
394+
it(`should return false for isNull on different fields`, () => {
395+
const a = func(`isNull`, ref(`deleted_at`))
396+
const b = func(`isNull`, ref(`created_at`))
397+
expect(isWhereSubset(a, b)).toBe(false)
398+
})
399+
400+
it(`should return true for and(eq, isNull) subset of identical and(eq, isNull)`, () => {
401+
const subset = and(
402+
eq(ref(`project_id`), val(`abc`)),
403+
func(`isNull`, ref(`soft_deleted_at`)),
404+
)
405+
const superset = and(
406+
eq(ref(`project_id`), val(`abc`)),
407+
func(`isNull`, ref(`soft_deleted_at`)),
408+
)
409+
expect(isWhereSubset(subset, superset)).toBe(true)
410+
})
411+
})
412+
348413
describe(`different fields`, () => {
349414
it(`should return false for different fields with no relationship`, () => {
350415
expect(

packages/electric-db-collection/src/electric.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,25 @@ function createLoadSubsetDedupe<T extends Row<unknown>>({
431431

432432
const { cursor, where, orderBy, limit } = opts
433433

434+
// When the stream is already up-to-date, it may be in a long-poll wait.
435+
// Forcing a disconnect-and-refresh ensures requestSnapshot gets a response
436+
// from a fresh server round-trip rather than waiting for the current poll to end.
437+
// If the refresh fails (e.g., PauseLock held during subscriber processing in
438+
// join pipelines), we fall through to requestSnapshot which still works.
439+
if (stream.isUpToDate) {
440+
try {
441+
await stream.forceDisconnectAndRefresh()
442+
} catch (error) {
443+
if (handleSnapshotError(error, `forceDisconnectAndRefresh`)) {
444+
return
445+
}
446+
debug(
447+
`${logPrefix}forceDisconnectAndRefresh failed, proceeding to requestSnapshot: %o`,
448+
error,
449+
)
450+
}
451+
}
452+
434453
try {
435454
if (cursor) {
436455
const whereCurrentOpts: LoadSubsetOptions = {

packages/electric-db-collection/tests/electric.test.ts

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ import type { StandardSchemaV1 } from '@standard-schema/spec'
2121
const mockSubscribe = vi.fn()
2222
const mockRequestSnapshot = vi.fn()
2323
const mockFetchSnapshot = vi.fn()
24+
const mockForceDisconnectAndRefresh = vi.fn()
2425
const mockStream = {
2526
subscribe: mockSubscribe,
2627
requestSnapshot: mockRequestSnapshot,
2728
fetchSnapshot: mockFetchSnapshot,
29+
forceDisconnectAndRefresh: mockForceDisconnectAndRefresh,
30+
isUpToDate: false,
2831
}
2932

3033
vi.mock(`@electric-sql/client`, async () => {
@@ -56,6 +59,8 @@ describe(`Electric Integration`, () => {
5659

5760
// Reset mock requestSnapshot
5861
mockRequestSnapshot.mockResolvedValue(undefined)
62+
mockForceDisconnectAndRefresh.mockResolvedValue(undefined)
63+
mockStream.isUpToDate = false
5964

6065
// Create collection with Electric configuration
6166
const config = {
@@ -2376,7 +2381,7 @@ describe(`Electric Integration`, () => {
23762381
// In on-demand mode, calling loadSubset should request a snapshot
23772382
await testCollection._sync.loadSubset({ limit: 10 })
23782383

2379-
// Verify requestSnapshot was called
2384+
expect(mockForceDisconnectAndRefresh).not.toHaveBeenCalled()
23802385
expect(mockRequestSnapshot).toHaveBeenCalledWith(
23812386
expect.objectContaining({
23822387
limit: 10,
@@ -2385,6 +2390,65 @@ describe(`Electric Integration`, () => {
23852390
)
23862391
})
23872392

2393+
it(`should refresh the stream before requesting on-demand snapshots when already up-to-date`, async () => {
2394+
vi.clearAllMocks()
2395+
2396+
const config = {
2397+
id: `on-demand-refresh-before-snapshot-test`,
2398+
shapeOptions: {
2399+
url: `http://test-url`,
2400+
params: {
2401+
table: `test_table`,
2402+
},
2403+
},
2404+
syncMode: `on-demand` as const,
2405+
getKey: (item: Row) => item.id as number,
2406+
startSync: true,
2407+
}
2408+
2409+
const testCollection = createCollection(electricCollectionOptions(config))
2410+
2411+
mockStream.isUpToDate = true
2412+
2413+
await testCollection._sync.loadSubset({ limit: 10 })
2414+
2415+
expect(mockForceDisconnectAndRefresh).toHaveBeenCalledTimes(1)
2416+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
2417+
const refreshCall =
2418+
mockForceDisconnectAndRefresh.mock.invocationCallOrder[0]!
2419+
const snapshotCall = mockRequestSnapshot.mock.invocationCallOrder[0]!
2420+
expect(refreshCall).toBeLessThan(snapshotCall)
2421+
})
2422+
2423+
it(`should fall through to requestSnapshot when forceDisconnectAndRefresh fails`, async () => {
2424+
vi.clearAllMocks()
2425+
2426+
const config = {
2427+
id: `on-demand-refresh-fallthrough-test`,
2428+
shapeOptions: {
2429+
url: `http://test-url`,
2430+
params: {
2431+
table: `test_table`,
2432+
},
2433+
},
2434+
syncMode: `on-demand` as const,
2435+
getKey: (item: Row) => item.id as number,
2436+
startSync: true,
2437+
}
2438+
2439+
const testCollection = createCollection(electricCollectionOptions(config))
2440+
2441+
mockStream.isUpToDate = true
2442+
mockForceDisconnectAndRefresh.mockImplementationOnce(async () => {
2443+
throw new Error(`PauseLock held`)
2444+
})
2445+
2446+
await testCollection._sync.loadSubset({ limit: 10 })
2447+
2448+
expect(mockForceDisconnectAndRefresh).toHaveBeenCalledTimes(1)
2449+
expect(mockRequestSnapshot).toHaveBeenCalledTimes(1)
2450+
})
2451+
23882452
it(`should fetch snapshots in progressive mode when loadSubset is called before sync completes`, async () => {
23892453
vi.clearAllMocks()
23902454

0 commit comments

Comments
 (0)