From 45f1065015c3685d8b4a9b648b2c21016b71fba8 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Feb 2026 14:06:42 +0100 Subject: [PATCH 01/19] Add support for subqueries in select --- packages/db/src/query/builder/index.ts | 122 +++++- packages/db/src/query/compiler/group-by.ts | 8 +- packages/db/src/query/compiler/index.ts | 183 ++++++++- packages/db/src/query/compiler/select.ts | 9 + packages/db/src/query/ir.ts | 17 +- .../query/live/collection-config-builder.ts | 371 +++++++++++++++++- 6 files changed, 688 insertions(+), 22 deletions(-) diff --git a/packages/db/src/query/builder/index.ts b/packages/db/src/query/builder/index.ts index c43f3ad48..7b7b6d3e8 100644 --- a/packages/db/src/query/builder/index.ts +++ b/packages/db/src/query/builder/index.ts @@ -3,6 +3,7 @@ import { Aggregate as AggregateExpr, CollectionRef, Func as FuncExpr, + IncludesSubquery, PropRef, QueryRef, Value as ValueExpr, @@ -476,7 +477,7 @@ export class BaseQueryBuilder { const aliases = this._getCurrentAliases() const refProxy = createRefProxy(aliases) as RefsForContext const selectObject = callback(refProxy) - const select = buildNestedSelect(selectObject) + const select = buildNestedSelect(selectObject, aliases) return new BaseQueryBuilder({ ...this.query, @@ -852,7 +853,7 @@ function isPlainObject(value: any): value is Record { ) } -function buildNestedSelect(obj: any): any { +function buildNestedSelect(obj: any, parentAliases: Array = []): any { if (!isPlainObject(obj)) return toExpr(obj) const out: Record = {} for (const [k, v] of Object.entries(obj)) { @@ -861,11 +862,126 @@ function buildNestedSelect(obj: any): any { out[k] = v continue } - out[k] = buildNestedSelect(v) + if (v instanceof BaseQueryBuilder) { + out[k] = buildIncludesSubquery(v, k, parentAliases) + continue + } + out[k] = buildNestedSelect(v, parentAliases) } return out } +/** + * Builds an IncludesSubquery IR node from a child query builder. + * Extracts the correlation condition from the child's WHERE clauses by finding + * an eq() predicate that references both a parent alias and a child alias. + */ +function buildIncludesSubquery( + childBuilder: BaseQueryBuilder, + fieldName: string, + parentAliases: Array, +): IncludesSubquery { + const childQuery = childBuilder._getQuery() + + // Collect child's own aliases + const childAliases: Array = [childQuery.from.alias] + if (childQuery.join) { + for (const j of childQuery.join) { + childAliases.push(j.from.alias) + } + } + + // Walk child's WHERE clauses to find the correlation condition + let parentRef: PropRef | undefined + let childRef: PropRef | undefined + let correlationWhereIndex = -1 + + if (childQuery.where) { + for (let i = 0; i < childQuery.where.length; i++) { + const where = childQuery.where[i]! + const expr = + typeof where === `object` && `expression` in where + ? where.expression + : where + + // Look for eq(a, b) where one side references parent and other references child + if ( + expr.type === `func` && + expr.name === `eq` && + expr.args.length === 2 + ) { + const [argA, argB] = expr.args + const result = extractCorrelation( + argA!, + argB!, + parentAliases, + childAliases, + ) + if (result) { + parentRef = result.parentRef + childRef = result.childRef + correlationWhereIndex = i + break + } + } + } + } + + if (!parentRef || !childRef || correlationWhereIndex === -1) { + throw new Error( + `Includes subquery for "${fieldName}" must have a WHERE clause with an eq() condition ` + + `that correlates a parent field with a child field. ` + + `Example: .where(({child}) => eq(child.parentId, parent.id))`, + ) + } + + // Remove the correlation WHERE from the child query + const modifiedWhere = [...childQuery.where!] + modifiedWhere.splice(correlationWhereIndex, 1) + const modifiedQuery: QueryIR = { + ...childQuery, + where: modifiedWhere.length > 0 ? modifiedWhere : undefined, + } + + return new IncludesSubquery(modifiedQuery, parentRef, childRef, fieldName) +} + +/** + * Checks if two eq() arguments form a parent-child correlation. + * Returns the parent and child PropRefs if found, undefined otherwise. + */ +function extractCorrelation( + argA: BasicExpression, + argB: BasicExpression, + parentAliases: Array, + childAliases: Array, +): { parentRef: PropRef; childRef: PropRef } | undefined { + if (argA.type === `ref` && argB.type === `ref`) { + const aAlias = argA.path[0] + const bAlias = argB.path[0] + + if ( + aAlias && + bAlias && + parentAliases.includes(aAlias) && + childAliases.includes(bAlias) + ) { + return { parentRef: argA, childRef: argB } + } + + if ( + aAlias && + bAlias && + parentAliases.includes(bAlias) && + childAliases.includes(aAlias) + ) { + return { parentRef: argB, childRef: argA } + } + } + + return undefined +} + // Internal function to build a query from a callback // used by liveQueryCollectionOptions.query export function buildQuery( diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 10d83a11b..595d277ae 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -519,7 +519,7 @@ function evaluateWrappedAggregates( * contain an Aggregate. Safely returns false for nested Select objects. */ export function containsAggregate( - expr: BasicExpression | Aggregate | Select, + expr: BasicExpression | Aggregate | Select | { type: string }, ): boolean { if (!isExpressionLike(expr)) { return false @@ -527,9 +527,9 @@ export function containsAggregate( if (expr.type === `agg`) { return true } - if (expr.type === `func`) { - return expr.args.some((arg: BasicExpression | Aggregate) => - containsAggregate(arg), + if (expr.type === `func` && `args` in expr) { + return (expr.args as Array).some( + (arg: BasicExpression | Aggregate) => containsAggregate(arg), ) } return false diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index d0d7469e2..a6629d66a 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -1,4 +1,9 @@ -import { distinct, filter, map } from '@tanstack/db-ivm' +import { + distinct, + filter, + join as joinOperator, + map, +} from '@tanstack/db-ivm' import { optimizeQuery } from '../optimizer.js' import { CollectionInputNotFoundError, @@ -8,7 +13,12 @@ import { LimitOffsetRequireOrderByError, UnsupportedFromTypeError, } from '../../errors.js' -import { PropRef, Value as ValClass, getWhereExpression } from '../ir.js' +import { + IncludesSubquery, + PropRef, + Value as ValClass, + getWhereExpression, +} from '../ir.js' import { compileExpression, toBooleanPredicate } from './evaluators.js' import { processJoins } from './joins.js' import { containsAggregate, processGroupBy } from './group-by.js' @@ -33,6 +43,23 @@ import type { QueryCache, QueryMapping, WindowOptions } from './types.js' export type { WindowOptions } from './types.js' +/** + * Result of compiling an includes subquery, including the child pipeline + * and metadata needed to route child results to parent-scoped Collections. + */ +export interface IncludesCompilationResult { + /** Filtered child pipeline (post inner-join with parent keys) */ + pipeline: ResultStream + /** Result field name on parent (e.g., "issues") */ + fieldName: string + /** Parent-side correlation ref (e.g., project.id) */ + correlationField: PropRef + /** Child-side correlation ref (e.g., issue.projectId) */ + childCorrelationField: PropRef + /** Full compilation result for the child query (for nested includes + alias tracking) */ + childCompilationResult: CompilationResult +} + /** * Result of query compilation including both the pipeline and source-specific WHERE clauses */ @@ -67,6 +94,9 @@ export interface CompilationResult { * the inner aliases where collection subscriptions were created. */ aliasRemapping: Record + + /** Child pipelines for includes subqueries */ + includes?: Array } /** @@ -93,6 +123,9 @@ export function compileQuery( setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap(), + // For includes: parent key stream to inner-join with this query's FROM + parentKeyStream?: KeyedStream, + childCorrelationField?: PropRef, ): CompilationResult { // Check if the original raw query has already been compiled const cachedResult = cache.get(rawQuery) @@ -152,8 +185,44 @@ export function compileQuery( ) sources[mainSource] = mainInput + // If this is an includes child query, inner-join the raw input with parent keys. + // This filters the child collection to only rows matching parents in the result set. + // The inner join happens BEFORE namespace wrapping / WHERE / SELECT / ORDER BY, + // so the child pipeline only processes rows that match parents. + let filteredMainInput = mainInput + if (parentKeyStream && childCorrelationField) { + // Re-key child input by correlation field: [correlationValue, [childKey, childRow]] + const childFieldPath = childCorrelationField.path.slice(1) // remove alias prefix + const childRekeyed = mainInput.pipe( + map(([key, row]: [unknown, any]) => { + const correlationValue = getNestedValue(row, childFieldPath) + return [correlationValue, [key, row]] as [unknown, [unknown, any]] + }), + ) + + // Inner join: only children whose correlation key exists in parent keys pass through + const joined = childRekeyed.pipe( + joinOperator(parentKeyStream, `inner`), + ) + + // Extract: [correlationValue, [[childKey, childRow], null]] → [childKey, childRow] + // Tag the row with __correlationKey for output routing + filteredMainInput = joined.pipe( + filter(([_correlationValue, [childSide]]: any) => { + return childSide != null + }), + map(([correlationValue, [childSide, _parentSide]]: any) => { + const [childKey, childRow] = childSide + return [childKey, { ...childRow, __correlationKey: correlationValue }] + }), + ) + + // Update sources so the rest of the pipeline uses the filtered input + sources[mainSource] = filteredMainInput + } + // Prepare the initial pipeline with the main source wrapped in its alias - let pipeline: NamespacedAndKeyedStream = mainInput.pipe( + let pipeline: NamespacedAndKeyedStream = filteredMainInput.pipe( map(([key, row]) => { // Initialize the record with a nested structure const ret = [key, { [mainSource]: row }] as [ @@ -214,6 +283,52 @@ export function compileQuery( } } + // Extract includes from SELECT, compile child pipelines, and replace with placeholders. + // This must happen AFTER WHERE (so parent pipeline is filtered) but BEFORE processSelect + // (so IncludesSubquery nodes are stripped before select compilation). + const includesResults: Array = [] + if (query.select) { + const includesEntries = extractIncludesFromSelect(query.select) + for (const { key, subquery } of includesEntries) { + // Branch parent pipeline: map to [correlationValue, null] + const compiledCorrelation = compileExpression(subquery.correlationField) + const parentKeys = pipeline.pipe( + map(([_key, nsRow]: any) => [compiledCorrelation(nsRow), null] as any), + ) + + // Recursively compile child query WITH the parent key stream + const childResult = compileQuery( + subquery.query, + allInputs, + collections, + subscriptions, + callbacks, + lazySources, + optimizableOrderByCollections, + setWindowFn, + cache, + queryMapping, + parentKeys, + subquery.childCorrelationField, + ) + + // Merge child's alias metadata into parent's + Object.assign(aliasToCollectionId, childResult.aliasToCollectionId) + Object.assign(aliasRemapping, childResult.aliasRemapping) + + includesResults.push({ + pipeline: childResult.pipeline, + fieldName: subquery.fieldName, + correlationField: subquery.correlationField, + childCorrelationField: subquery.childCorrelationField, + childCompilationResult: childResult, + }) + + // Replace includes entry in select with a null placeholder + replaceIncludesInSelect(query.select, key) + } + } + if (query.distinct && !query.fnSelect && !query.select) { throw new DistinctRequiresSelectError() } @@ -330,23 +445,30 @@ export function compileQuery( ) // Final step: extract the $selected and include orderBy index - const resultPipeline = orderedPipeline.pipe( + const resultPipeline: ResultStream = orderedPipeline.pipe( map(([key, [row, orderByIndex]]) => { // Extract the final results from $selected and include orderBy index const raw = (row as any).$selected const finalResults = unwrapValue(raw) + // When in includes mode, embed the correlation key as third element + if (parentKeyStream) { + const correlationKey = (row as any)[mainSource]?.__correlationKey + return [key, [finalResults, orderByIndex, correlationKey]] as any + } return [key, [finalResults, orderByIndex]] as [unknown, [any, string]] }), - ) + ) as ResultStream const result = resultPipeline // Cache the result before returning (use original query as key) - const compilationResult = { + const compilationResult: CompilationResult = { collectionId: mainCollectionId, pipeline: result, sourceWhereClauses, aliasToCollectionId, aliasRemapping, + includes: + includesResults.length > 0 ? includesResults : undefined, } cache.set(rawQuery, compilationResult) @@ -362,6 +484,11 @@ export function compileQuery( // Extract the final results from $selected and return [key, [results, undefined]] const raw = (row as any).$selected const finalResults = unwrapValue(raw) + // When in includes mode, embed the correlation key as third element + if (parentKeyStream) { + const correlationKey = (row as any)[mainSource]?.__correlationKey + return [key, [finalResults, undefined, correlationKey]] as any + } return [key, [finalResults, undefined]] as [ unknown, [any, string | undefined], @@ -371,12 +498,14 @@ export function compileQuery( const result = resultPipeline // Cache the result before returning (use original query as key) - const compilationResult = { + const compilationResult: CompilationResult = { collectionId: mainCollectionId, pipeline: result, sourceWhereClauses, aliasToCollectionId, aliasRemapping, + includes: + includesResults.length > 0 ? includesResults : undefined, } cache.set(rawQuery, compilationResult) @@ -704,4 +833,44 @@ export function followRef( } } +/** + * Walks a Select object to find IncludesSubquery entries. + * Returns array of {key, subquery} for each found includes. + */ +function extractIncludesFromSelect( + select: Record, +): Array<{ key: string; subquery: IncludesSubquery }> { + const results: Array<{ key: string; subquery: IncludesSubquery }> = [] + for (const [key, value] of Object.entries(select)) { + if (value instanceof IncludesSubquery) { + results.push({ key, subquery: value }) + } + } + return results +} + +/** + * Replaces an IncludesSubquery entry in the select object with a null Value placeholder. + * This ensures processSelect() doesn't encounter it. + */ +function replaceIncludesInSelect( + select: Record, + key: string, +): void { + select[key] = new ValClass(null) +} + +/** + * Gets a nested value from an object by path segments. + * For v1 with single-level correlation fields (e.g., `projectId`), it's just `obj[path[0]]`. + */ +function getNestedValue(obj: any, path: Array): any { + let value = obj + for (const segment of path) { + if (value == null) return value + value = value[segment] + } + return value +} + export type CompileQueryFn = typeof compileQuery diff --git a/packages/db/src/query/compiler/select.ts b/packages/db/src/query/compiler/select.ts index c5baccb68..6eb866ce8 100644 --- a/packages/db/src/query/compiler/select.ts +++ b/packages/db/src/query/compiler/select.ts @@ -221,6 +221,15 @@ function addFromObject( } const expression = value as any + if (expression && expression.type === `includesSubquery`) { + // Placeholder — field will be set to a child Collection by the output layer + ops.push({ + kind: `field`, + alias: [...prefixPath, key].join(`.`), + compiled: () => null, + }) + continue + } if (isNestedSelectObject(expression)) { // Nested selection object addFromObject([...prefixPath, key], expression, ops) diff --git a/packages/db/src/query/ir.ts b/packages/db/src/query/ir.ts index b1e3d1e07..64ddd22c7 100644 --- a/packages/db/src/query/ir.ts +++ b/packages/db/src/query/ir.ts @@ -28,7 +28,7 @@ export interface QueryIR { export type From = CollectionRef | QueryRef export type Select = { - [alias: string]: BasicExpression | Aggregate | Select + [alias: string]: BasicExpression | Aggregate | Select | IncludesSubquery } export type Join = Array @@ -132,6 +132,18 @@ export class Aggregate extends BaseExpression { } } +export class IncludesSubquery extends BaseExpression { + public type = `includesSubquery` as const + constructor( + public query: QueryIR, // Child query (correlation WHERE removed) + public correlationField: PropRef, // Parent-side ref (e.g., project.id) + public childCorrelationField: PropRef, // Child-side ref (e.g., issue.projectId) + public fieldName: string, // Result field name (e.g., "issues") + ) { + super() + } +} + /** * Runtime helper to detect IR expression-like objects. * Prefer this over ad-hoc local implementations to keep behavior consistent. @@ -141,7 +153,8 @@ export function isExpressionLike(value: any): boolean { value instanceof Aggregate || value instanceof Func || value instanceof PropRef || - value instanceof Value + value instanceof Value || + value instanceof IncludesSubquery ) } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 72efb75d8..33ed1bf92 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,5 +1,7 @@ import { D2, output } from '@tanstack/db-ivm' import { compileQuery } from '../compiler/index.js' +import { createCollection } from '../../collection/index.js' +import { IncludesSubquery } from '../ir.js' import { buildQuery, getQueryIR } from '../builder/index.js' import { MissingAliasInputsError, @@ -11,7 +13,10 @@ import { CollectionSubscriber } from './collection-subscriber.js' import { getCollectionBuilder } from './collection-registry.js' import { LIVE_QUERY_INTERNAL } from './internal.js' import type { LiveQueryInternalUtils } from './internal.js' -import type { WindowOptions } from '../compiler/index.js' +import type { + IncludesCompilationResult, + WindowOptions, +} from '../compiler/index.js' import type { SchedulerContextId } from '../../scheduler.js' import type { CollectionSubscription } from '../../collection/subscription.js' import type { RootStreamBuilder } from '@tanstack/db-ivm' @@ -26,7 +31,7 @@ import type { UtilsRecord, } from '../../types.js' import type { Context, GetResult } from '../builder/types.js' -import type { BasicExpression, QueryIR } from '../ir.js' +import type { BasicExpression, PropRef, QueryIR } from '../ir.js' import type { LazyCollectionCallbacks } from '../compiler/joins.js' import type { Changes, @@ -135,6 +140,7 @@ export class CollectionConfigBuilder< public sourceWhereClausesCache: | Map> | undefined + private includesCache: Array | undefined // Map of source alias to subscription readonly subscriptions: Record = {} @@ -627,6 +633,7 @@ export class CollectionConfigBuilder< this.inputsCache = undefined this.pipelineCache = undefined this.sourceWhereClausesCache = undefined + this.includesCache = undefined // Reset lazy source alias state this.lazySources.clear() @@ -675,6 +682,7 @@ export class CollectionConfigBuilder< this.pipelineCache = compilation.pipeline this.sourceWhereClausesCache = compilation.sourceWhereClauses this.compiledAliasToCollectionId = compilation.aliasToCollectionId + this.includesCache = compilation.includes // Defensive check: verify all compiled aliases have corresponding inputs // This should never happen since all aliases come from user declarations, @@ -722,10 +730,18 @@ export class CollectionConfigBuilder< }), ) + // Set up includes output routing and child collection lifecycle + const includesState = this.setupIncludesOutput(this.includesCache, syncState) + // Flush pending changes and reset the accumulator. // Called at the end of each graph run to commit all accumulated changes. syncState.flushPendingChanges = () => { - if (pendingChanges.size === 0) { + const hasParentChanges = pendingChanges.size > 0 + const hasChildChanges = includesState.some( + (s) => s.pendingChildChanges.size > 0, + ) + + if (!hasParentChanges && !hasChildChanges) { return } @@ -757,10 +773,21 @@ export class CollectionConfigBuilder< changesToApply = merged } - begin() - changesToApply.forEach(this.applyChanges.bind(this, config)) - commit() + // 1. Flush parent changes + if (hasParentChanges) { + begin() + changesToApply.forEach(this.applyChanges.bind(this, config)) + commit() + } pendingChanges = new Map() + + // 2. Process includes: create/dispose child Collections, route child changes + flushIncludesState( + includesState, + config.collection, + this.id, + hasParentChanges ? changesToApply : null, + ) } graph.finalize() @@ -773,6 +800,79 @@ export class CollectionConfigBuilder< return syncState as FullSyncState } + /** + * Sets up output callbacks for includes child pipelines. + * Each includes entry gets its own output callback that accumulates child changes, + * and a child registry that maps correlation key → child Collection. + */ + private setupIncludesOutput( + includesEntries: Array | undefined, + syncState: SyncState, + ): Array { + if (!includesEntries || includesEntries.length === 0) { + return [] + } + + return includesEntries.map((entry) => { + const state: IncludesOutputState = { + fieldName: entry.fieldName, + correlationField: entry.correlationField, + childCorrelationField: entry.childCorrelationField, + childRegistry: new Map(), + pendingChildChanges: new Map(), + } + + // Attach output callback on the child pipeline + entry.pipeline.pipe( + output((data) => { + const messages = data.getInner() + syncState.messagesCount += messages.length + + for (const [ + [childKey, tupleData], + multiplicity, + ] of messages) { + const [childResult, _orderByIndex, correlationKey] = + tupleData as unknown as [any, string | undefined, unknown] + + // Accumulate by [correlationKey, childKey] + let byChild = state.pendingChildChanges.get(correlationKey) + if (!byChild) { + byChild = new Map() + state.pendingChildChanges.set(correlationKey, byChild) + } + + const existing = byChild.get(childKey) || { + deletes: 0, + inserts: 0, + value: childResult, + orderByIndex: _orderByIndex, + } + + if (multiplicity < 0) { + existing.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + existing.inserts += multiplicity + existing.value = childResult + } + + byChild.set(childKey, existing) + } + }), + ) + + // Recursively set up nested includes (e.g., comments inside issues) + if (entry.childCompilationResult.includes) { + state.nestedIncludesState = this.setupIncludesOutput( + entry.childCompilationResult.includes, + syncState, + ) + } + + return state + }) + } + private applyChanges( config: SyncMethods, changes: { @@ -1053,6 +1153,24 @@ function extractCollectionsFromQuery( } } } + + // Extract from SELECT (for IncludesSubquery) + if (q.select) { + extractFromSelect(q.select) + } + } + + function extractFromSelect(select: any) { + for (const [key, value] of Object.entries(select)) { + if (typeof key === `string` && key.startsWith(`__SPREAD_SENTINEL__`)) { + continue + } + if (value instanceof IncludesSubquery) { + extractFromQuery(value.query) + } else if (isNestedSelectObject(value)) { + extractFromSelect(value) + } + } } // Start extraction from the root query @@ -1122,6 +1240,19 @@ function extractCollectionAliases(query: QueryIR): Map> { } } + function traverseSelect(select: any) { + for (const [key, value] of Object.entries(select)) { + if (typeof key === `string` && key.startsWith(`__SPREAD_SENTINEL__`)) { + continue + } + if (value instanceof IncludesSubquery) { + traverse(value.query) + } else if (isNestedSelectObject(value)) { + traverseSelect(value) + } + } + } + function traverse(q?: QueryIR) { if (!q) return @@ -1132,6 +1263,10 @@ function extractCollectionAliases(query: QueryIR): Map> { recordAlias(joinClause.from) } } + + if (q.select) { + traverseSelect(q.select) + } } traverse(query) @@ -1139,6 +1274,230 @@ function extractCollectionAliases(query: QueryIR): Map> { return aliasesById } +/** + * Check if a value is a nested select object (plain object, not an expression) + */ +function isNestedSelectObject(obj: any): boolean { + if (obj === null || typeof obj !== `object`) return false + if (obj instanceof IncludesSubquery) return false + // Expression-like objects have a .type property + if (`type` in obj && typeof obj.type === `string`) return false + // Ref proxies from spread operations + if (obj.__refProxy) return false + return true +} + +/** + * State tracked per includes entry for output routing and child lifecycle + */ +type IncludesOutputState = { + fieldName: string + correlationField: PropRef + childCorrelationField: PropRef + /** Maps correlation key value → child Collection entry */ + childRegistry: Map + /** Pending child changes: correlationKey → Map */ + pendingChildChanges: Map>> + /** Nested includes state (for projects → issues → comments) */ + nestedIncludesState?: Array +} + +type ChildCollectionEntry = { + collection: Collection + syncMethods: SyncMethods | null + resultKeys: WeakMap +} + +/** + * Creates a child Collection entry for includes subqueries. + * The child Collection is a full-fledged Collection instance that starts syncing immediately. + */ +function createChildCollectionEntry( + parentId: string, + fieldName: string, + correlationKey: unknown, +): ChildCollectionEntry { + const resultKeys = new WeakMap() + let syncMethods: SyncMethods | null = null + + const collection = createCollection({ + id: `${parentId}-${fieldName}-${String(correlationKey)}`, + getKey: (item: any) => resultKeys.get(item) as string | number, + sync: { + rowUpdateMode: `full`, + sync: (methods) => { + syncMethods = methods + return () => { + syncMethods = null + } + }, + }, + startSync: true, + }) + + return { collection, get syncMethods() { return syncMethods }, resultKeys } +} + +/** + * Recursively flushes includes state, processing child changes and creating + * child Collections. Handles nested includes (e.g., comments inside issues) + * by recursing into nested state after flushing each level. + */ +function flushIncludesState( + includesState: Array, + parentCollection: Collection, + parentId: string, + parentChanges: Map> | null, +): void { + for (const state of includesState) { + // For parent INSERTs: ensure a child Collection exists for every parent, + // even those with no children (produces an empty child Collection). + if (parentChanges) { + const fieldPath = state.correlationField.path.slice(1) // remove alias prefix + for (const [_key, changes] of parentChanges) { + if (changes.inserts > 0) { + const parentResult = changes.value + // Extract the correlation key value from the parent result + let correlationKey: unknown = parentResult + for (const segment of fieldPath) { + if (correlationKey == null) break + correlationKey = (correlationKey as any)[segment] + } + + if (correlationKey != null) { + // Ensure child Collection exists for this correlation key + if (!state.childRegistry.has(correlationKey)) { + const entry = createChildCollectionEntry( + parentId, + state.fieldName, + correlationKey, + ) + state.childRegistry.set(correlationKey, entry) + } + // Attach child Collection to the parent result + parentResult[state.fieldName] = + state.childRegistry.get(correlationKey)!.collection + } + } + } + } + + // Flush child changes: route to correct child Collections + if (state.pendingChildChanges.size > 0) { + for (const [ + correlationKey, + childChanges, + ] of state.pendingChildChanges) { + // Ensure child Collection exists for this correlation key + let entry = state.childRegistry.get(correlationKey) + if (!entry) { + entry = createChildCollectionEntry( + parentId, + state.fieldName, + correlationKey, + ) + state.childRegistry.set(correlationKey, entry) + } + + // Attach the child Collection to ANY parent that has this correlation key + // by scanning the parent result collection + attachChildCollectionToParent( + parentCollection, + state.fieldName, + correlationKey, + state.correlationField, + entry.collection, + ) + + // Apply child changes to the child Collection + if (entry.syncMethods) { + entry.syncMethods.begin() + for (const [childKey, change] of childChanges) { + entry.resultKeys.set(change.value, childKey) + if (change.inserts > 0 && change.deletes === 0) { + entry.syncMethods.write({ value: change.value, type: `insert` }) + } else if ( + change.inserts > change.deletes || + (change.inserts === change.deletes && + entry.syncMethods.collection.has( + entry.syncMethods.collection.getKeyFromItem(change.value), + )) + ) { + entry.syncMethods.write({ value: change.value, type: `update` }) + } else if (change.deletes > 0) { + entry.syncMethods.write({ value: change.value, type: `delete` }) + } + } + entry.syncMethods.commit() + } + + // Recursively process nested includes (e.g., comments inside issues) + if (state.nestedIncludesState) { + flushIncludesState( + state.nestedIncludesState, + entry.collection, + entry.collection.id, + childChanges, + ) + } + } + state.pendingChildChanges.clear() + } + + // For parent DELETEs: dispose child Collections so re-added parents + // get a fresh empty child Collection instead of reusing stale data. + if (parentChanges) { + const fieldPath = state.correlationField.path.slice(1) + for (const [_key, changes] of parentChanges) { + if (changes.deletes > 0 && changes.inserts === 0) { + let correlationKey: unknown = changes.value + for (const segment of fieldPath) { + if (correlationKey == null) break + correlationKey = (correlationKey as any)[segment] + } + if (correlationKey != null) { + state.childRegistry.delete(correlationKey) + } + } + } + } + } +} + +/** + * Attaches a child Collection to parent rows that match a given correlation key. + * Scans the parent collection to find matching parents and sets the field. + */ +function attachChildCollectionToParent( + parentCollection: Collection, + fieldName: string, + correlationKey: unknown, + correlationField: PropRef, + childCollection: Collection, +): void { + // Walk the parent collection's items to find those matching this correlation key + // The correlation field path has the alias prefix (e.g., ['project', 'id']), + // but at this point the parent result is the selected object, not namespaced. + // We need to find parents by their correlation value. + // Since the parent correlation field is e.g. project.id, and the selected result + // might have 'id' as a field, we use the correlation field path (minus alias). + const fieldPath = correlationField.path.slice(1) // remove alias prefix + + for (const [_key, item] of parentCollection) { + // Navigate to the correlation value on the parent result + let value: any = item + for (const segment of fieldPath) { + if (value == null) break + value = value[segment] + } + + if (value === correlationKey) { + // Set the child Collection on this parent row + ;(item)[fieldName] = childCollection + } + } +} + function accumulateChanges( acc: Map>, [[key, tupleData], multiplicity]: [ From 2467505d8fdc5659a6c1433577cd1db83e00d561 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Feb 2026 14:07:04 +0100 Subject: [PATCH 02/19] Unit tests for includes --- packages/db/tests/query/includes.test.ts | 365 +++++++++++++++++++++++ 1 file changed, 365 insertions(+) create mode 100644 packages/db/tests/query/includes.test.ts diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts new file mode 100644 index 000000000..c09a0ca9c --- /dev/null +++ b/packages/db/tests/query/includes.test.ts @@ -0,0 +1,365 @@ +import { beforeEach, describe, expect, it } from 'vitest' +import { createLiveQueryCollection, eq } from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { mockSyncCollectionOptions } from '../utils.js' + +type Project = { + id: number + name: string +} + +type Issue = { + id: number + projectId: number + title: string +} + +type Comment = { + id: number + issueId: number + body: string +} + +const sampleProjects: Array = [ + { id: 1, name: `Alpha` }, + { id: 2, name: `Beta` }, + { id: 3, name: `Gamma` }, +] + +const sampleIssues: Array = [ + { id: 10, projectId: 1, title: `Bug in Alpha` }, + { id: 11, projectId: 1, title: `Feature for Alpha` }, + { id: 20, projectId: 2, title: `Bug in Beta` }, + // No issues for project 3 +] + +const sampleComments: Array = [ + { id: 100, issueId: 10, body: `Looks bad` }, + { id: 101, issueId: 10, body: `Fixed it` }, + { id: 200, issueId: 20, body: `Same bug` }, + // No comments for issue 11 +] + +function createProjectsCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-projects`, + getKey: (p) => p.id, + initialData: sampleProjects, + }), + ) +} + +function createIssuesCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-issues`, + getKey: (i) => i.id, + initialData: sampleIssues, + }), + ) +} + +function createCommentsCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-comments`, + getKey: (c) => c.id, + initialData: sampleComments, + }), + ) +} + +/** + * Extracts child collection items as a sorted plain array for comparison. + */ +function childItems(collection: any, sortKey = `id`): Array { + return [...collection.toArray].sort( + (a: any, b: any) => a[sortKey] - b[sortKey], + ) +} + +/** + * Recursively converts a live query collection (or child Collection) into a + * plain sorted array, turning any nested child Collections into nested arrays. + * This lets tests compare the full hierarchical result as a single literal. + */ +function toTree(collection: any, sortKey = `id`): Array { + const rows = [...collection.toArray].sort( + (a: any, b: any) => a[sortKey] - b[sortKey], + ) + return rows.map((row: any) => { + const out: Record = {} + for (const [key, value] of Object.entries(row)) { + out[key] = + value && typeof value === `object` && `toArray` in (value as any) + ? toTree(value, sortKey) + : value + } + return out + }) +} + +describe(`includes subqueries`, () => { + let projects: ReturnType + let issues: ReturnType + let comments: ReturnType + + beforeEach(() => { + projects = createProjectsCollection() + issues = createIssuesCollection() + comments = createCommentsCollection() + }) + + function buildIncludesQuery() { + return createLiveQueryCollection((q) => + q + .from({ p: projects }) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + } + + describe(`basic includes`, () => { + it(`produces child Collections on parent rows`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Alpha`, + issues: [ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ], + }, + { + id: 2, + name: `Beta`, + issues: [{ id: 20, title: `Bug in Beta` }], + }, + { + id: 3, + name: `Gamma`, + issues: [], + }, + ]) + }) + +}) + + describe(`reactivity`, () => { + it(`adding a child updates the parent's child collection`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(1) as any).issues)).toHaveLength(2) + + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 12, projectId: 1, title: `New Alpha issue` }, + }) + issues.utils.commit() + + expect(childItems((collection.get(1) as any).issues)).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + { id: 12, title: `New Alpha issue` }, + ]) + }) + + it(`removing a child updates the parent's child collection`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(1) as any).issues)).toHaveLength(2) + + issues.utils.begin() + issues.utils.write({ + type: `delete`, + value: sampleIssues.find((i) => i.id === 10)!, + }) + issues.utils.commit() + + expect(childItems((collection.get(1) as any).issues)).toEqual([ + { id: 11, title: `Feature for Alpha` }, + ]) + }) + + it(`removing and re-adding a parent resets its child collection`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(1) as any).issues)).toHaveLength(2) + + // Remove project Alpha + projects.utils.begin() + projects.utils.write({ + type: `delete`, + value: sampleProjects.find((p) => p.id === 1)!, + }) + projects.utils.commit() + + expect(collection.get(1)).toBeUndefined() + + // Re-add project Alpha — should get a fresh child collection + projects.utils.begin() + projects.utils.write({ + type: `insert`, + value: { id: 1, name: `Alpha Reborn` }, + }) + projects.utils.commit() + + const alpha = collection.get(1) as any + expect(alpha).toMatchObject({ id: 1, name: `Alpha Reborn` }) + expect(childItems(alpha.issues)).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ]) + + // New children should flow into the child collection + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 99, projectId: 1, title: `Post-rebirth issue` }, + }) + issues.utils.commit() + + expect(childItems((collection.get(1) as any).issues)).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + { id: 99, title: `Post-rebirth issue` }, + ]) + }) + + it(`adding a child to a previously empty parent works`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(3) as any).issues)).toEqual([]) + + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 30, projectId: 3, title: `Gamma issue` }, + }) + issues.utils.commit() + + expect(childItems((collection.get(3) as any).issues)).toEqual([ + { id: 30, title: `Gamma issue` }, + ]) + }) + }) + + describe(`inner join filtering`, () => { + it(`only shows children for parents matching a WHERE clause`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ p: projects }) + .where(({ p }) => eq(p.name, `Alpha`)) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Alpha`, + issues: [ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ], + }, + ]) + }) + }) + + describe(`nested includes`, () => { + it(`supports two levels of includes`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ p: projects }) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + comments: q + .from({ c: comments }) + .where(({ c }) => eq(c.issueId, i.id)) + .select(({ c }) => ({ + id: c.id, + body: c.body, + })), + })), + })), + ) + + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Alpha`, + issues: [ + { + id: 10, + title: `Bug in Alpha`, + comments: [ + { id: 100, body: `Looks bad` }, + { id: 101, body: `Fixed it` }, + ], + }, + { + id: 11, + title: `Feature for Alpha`, + comments: [], + }, + ], + }, + { + id: 2, + name: `Beta`, + issues: [ + { + id: 20, + title: `Bug in Beta`, + comments: [{ id: 200, body: `Same bug` }], + }, + ], + }, + { + id: 3, + name: `Gamma`, + issues: [], + }, + ]) + }) + }) +}) From e702f8051dee7c716ad828d26d7c9e6d205d657e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Feb 2026 15:51:46 +0100 Subject: [PATCH 03/19] Unit tests for ordered subqueries --- packages/db/tests/query/includes.test.ts | 83 ++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index c09a0ca9c..a214ed13e 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -296,6 +296,89 @@ describe(`includes subqueries`, () => { }) }) + describe(`ordered child queries`, () => { + it(`child collection respects orderBy on the child query`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ p: projects }) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `desc`) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha's issues should be sorted by title descending: + // "Feature for Alpha" before "Bug in Alpha" + const alpha = collection.get(1) as any + const alphaIssues = [...alpha.issues.toArray] + expect(alphaIssues).toEqual([ + { id: 11, title: `Feature for Alpha` }, + { id: 10, title: `Bug in Alpha` }, + ]) + + // Beta has one issue, order doesn't matter but it should still work + const beta = collection.get(2) as any + const betaIssues = [...beta.issues.toArray] + expect(betaIssues).toEqual([{ id: 20, title: `Bug in Beta` }]) + + // Gamma has no issues + const gamma = collection.get(3) as any + expect([...gamma.issues.toArray]).toEqual([]) + }) + + it(`newly inserted children appear in the correct order`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ p: projects }) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha issues sorted ascending: "Bug in Alpha", "Feature for Alpha" + expect([...(collection.get(1) as any).issues.toArray]).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ]) + + // Insert an issue that should appear between the existing two + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 12, projectId: 1, title: `Docs for Alpha` }, + }) + issues.utils.commit() + + // Should maintain ascending order: Bug, Docs, Feature + expect([...(collection.get(1) as any).issues.toArray]).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 12, title: `Docs for Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ]) + }) + }) + describe(`nested includes`, () => { it(`supports two levels of includes`, async () => { const collection = createLiveQueryCollection((q) => From f592b81fb699e053b4f294f98a973b8b0aa0c452 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Feb 2026 16:01:47 +0100 Subject: [PATCH 04/19] Add support for ordered subquery --- packages/db/src/query/compiler/index.ts | 3 +++ .../query/live/collection-config-builder.ts | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index a6629d66a..c2bc1d0de 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -56,6 +56,8 @@ export interface IncludesCompilationResult { correlationField: PropRef /** Child-side correlation ref (e.g., issue.projectId) */ childCorrelationField: PropRef + /** Whether the child query has an ORDER BY clause */ + hasOrderBy: boolean /** Full compilation result for the child query (for nested includes + alias tracking) */ childCompilationResult: CompilationResult } @@ -321,6 +323,7 @@ export function compileQuery( fieldName: subquery.fieldName, correlationField: subquery.correlationField, childCorrelationField: subquery.childCorrelationField, + hasOrderBy: !!(subquery.query.orderBy && subquery.query.orderBy.length > 0), childCompilationResult: childResult, }) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 33ed1bf92..10e3c041f 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -818,6 +818,7 @@ export class CollectionConfigBuilder< fieldName: entry.fieldName, correlationField: entry.correlationField, childCorrelationField: entry.childCorrelationField, + hasOrderBy: entry.hasOrderBy, childRegistry: new Map(), pendingChildChanges: new Map(), } @@ -1294,6 +1295,8 @@ type IncludesOutputState = { fieldName: string correlationField: PropRef childCorrelationField: PropRef + /** Whether the child query has an ORDER BY clause */ + hasOrderBy: boolean /** Maps correlation key value → child Collection entry */ childRegistry: Map /** Pending child changes: correlationKey → Map */ @@ -1306,6 +1309,7 @@ type ChildCollectionEntry = { collection: Collection syncMethods: SyncMethods | null resultKeys: WeakMap + orderByIndices: WeakMap | null } /** @@ -1316,13 +1320,20 @@ function createChildCollectionEntry( parentId: string, fieldName: string, correlationKey: unknown, + hasOrderBy: boolean, ): ChildCollectionEntry { const resultKeys = new WeakMap() + const orderByIndices = hasOrderBy ? new WeakMap() : null let syncMethods: SyncMethods | null = null + const compare = orderByIndices + ? createOrderByComparator(orderByIndices) + : undefined + const collection = createCollection({ id: `${parentId}-${fieldName}-${String(correlationKey)}`, getKey: (item: any) => resultKeys.get(item) as string | number, + compare, sync: { rowUpdateMode: `full`, sync: (methods) => { @@ -1335,7 +1346,7 @@ function createChildCollectionEntry( startSync: true, }) - return { collection, get syncMethods() { return syncMethods }, resultKeys } + return { collection, get syncMethods() { return syncMethods }, resultKeys, orderByIndices } } /** @@ -1371,6 +1382,7 @@ function flushIncludesState( parentId, state.fieldName, correlationKey, + state.hasOrderBy, ) state.childRegistry.set(correlationKey, entry) } @@ -1395,6 +1407,7 @@ function flushIncludesState( parentId, state.fieldName, correlationKey, + state.hasOrderBy, ) state.childRegistry.set(correlationKey, entry) } @@ -1414,6 +1427,9 @@ function flushIncludesState( entry.syncMethods.begin() for (const [childKey, change] of childChanges) { entry.resultKeys.set(change.value, childKey) + if (entry.orderByIndices && change.orderByIndex !== undefined) { + entry.orderByIndices.set(change.value, change.orderByIndex) + } if (change.inserts > 0 && change.deletes === 0) { entry.syncMethods.write({ value: change.value, type: `insert` }) } else if ( From 697502cedf5fb2807ba76346e08527d6fb02df22 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Feb 2026 16:16:41 +0100 Subject: [PATCH 05/19] Unit tests for subqueries with limit --- packages/db/tests/query/includes.test.ts | 87 ++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index a214ed13e..248c15ecd 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -379,6 +379,93 @@ describe(`includes subqueries`, () => { }) }) + describe(`ordered child queries with limit`, () => { + it(`limits child collection to N items per parent`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ p: projects }) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .limit(1) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha has 2 issues; limit(1) with asc title should keep only "Bug in Alpha" + const alpha = collection.get(1) as any + expect([...alpha.issues.toArray]).toEqual([ + { id: 10, title: `Bug in Alpha` }, + ]) + + // Beta has 1 issue; limit(1) keeps it + const beta = collection.get(2) as any + expect([...beta.issues.toArray]).toEqual([ + { id: 20, title: `Bug in Beta` }, + ]) + + // Gamma has 0 issues; limit(1) still empty + const gamma = collection.get(3) as any + expect([...gamma.issues.toArray]).toEqual([]) + }) + + it(`inserting a child that displaces an existing one respects the limit`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ p: projects }) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .limit(1) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha should have exactly 1 issue (limit 1): "Bug in Alpha" + const alphaIssues = [...(collection.get(1) as any).issues.toArray] + expect(alphaIssues).toHaveLength(1) + expect(alphaIssues).toEqual([ + { id: 10, title: `Bug in Alpha` }, + ]) + + // Insert an issue that comes before "Bug" alphabetically + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 12, projectId: 1, title: `Alpha priority issue` }, + }) + issues.utils.commit() + + // The new issue should displace "Bug in Alpha" since it sorts first + expect([...(collection.get(1) as any).issues.toArray]).toEqual([ + { id: 12, title: `Alpha priority issue` }, + ]) + + // Beta should still have its 1 issue (limit is per-parent) + expect([...(collection.get(2) as any).issues.toArray]).toEqual([ + { id: 20, title: `Bug in Beta` }, + ]) + }) + }) + describe(`nested includes`, () => { it(`supports two levels of includes`, async () => { const collection = createLiveQueryCollection((q) => From 1c2728c5bc9278c831395606961d60ef5c801cbf Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Tue, 24 Feb 2026 16:33:05 +0100 Subject: [PATCH 06/19] Support LIMIT and OFFSET in subqueries --- packages/db/src/query/compiler/index.ts | 9 ++++++ packages/db/src/query/compiler/order-by.ts | 36 ++++++++++++++++++++-- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index c2bc1d0de..a78a86ba6 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -435,6 +435,14 @@ export function compileQuery( // Process orderBy parameter if it exists if (query.orderBy && query.orderBy.length > 0) { + // When in includes mode with limit/offset, use grouped ordering so that + // the limit is applied per parent (per correlation key), not globally. + const includesGroupKeyFn = + parentKeyStream && (query.limit !== undefined || query.offset !== undefined) + ? (_key: unknown, row: unknown) => + (row as any)?.[mainSource]?.__correlationKey + : undefined + const orderedPipeline = processOrderBy( rawQuery, pipeline, @@ -445,6 +453,7 @@ export function compileQuery( setWindowFn, query.limit, query.offset, + includesGroupKeyFn, ) // Final step: extract the $selected and include orderBy index diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 0ced0081c..2107413b1 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -1,4 +1,4 @@ -import { orderByWithFractionalIndex } from '@tanstack/db-ivm' +import { groupedOrderByWithFractionalIndex, orderByWithFractionalIndex } from '@tanstack/db-ivm' import { defaultComparator, makeComparator } from '../../utils/comparison.js' import { PropRef, followRef } from '../ir.js' import { ensureIndexForField } from '../../indexes/auto-index.js' @@ -51,6 +51,7 @@ export function processOrderBy( setWindowFn: (windowFn: (options: WindowOptions) => void) => void, limit?: number, offset?: number, + groupKeyFn?: (key: unknown, value: unknown) => unknown, ): IStreamBuilder> { // Pre-compile all order by expressions const compiledOrderBy = orderByClause.map((clause) => { @@ -126,7 +127,9 @@ export function processOrderBy( // to loadSubset so the sync layer can optimize the query. // We try to use an index on the FIRST orderBy column for lazy loading, // even for multi-column orderBy (using wider bounds on first column). - if (limit) { + // Skip this optimization when using grouped ordering (includes with limit), + // because the limit is per-group, not global — the child collection needs all data loaded. + if (limit && !groupKeyFn) { let index: IndexInterface | undefined let followRefCollection: Collection | undefined let firstColumnValueExtractor: CompiledSingleRowExpression | undefined @@ -290,6 +293,35 @@ export function processOrderBy( } } + // Use grouped ordering when a groupKeyFn is provided (includes with limit/offset), + // otherwise use the standard global ordering operator. + if (groupKeyFn) { + return pipeline.pipe( + groupedOrderByWithFractionalIndex(valueExtractor, { + limit, + offset, + comparator: compare, + setSizeCallback, + groupKeyFn, + setWindowFn: ( + windowFn: (options: { offset?: number; limit?: number }) => void, + ) => { + setWindowFn( + (options) => { + windowFn(options) + if (orderByOptimizationInfo) { + orderByOptimizationInfo.offset = + options.offset ?? orderByOptimizationInfo.offset + orderByOptimizationInfo.limit = + options.limit ?? orderByOptimizationInfo.limit + } + }, + ) + }, + }), + ) + } + // Use fractional indexing and return the tuple [value, index] return pipeline.pipe( orderByWithFractionalIndex(valueExtractor, { From 16ac62b13730b3d8ca0dc02b26c68b2b696aebd0 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 25 Feb 2026 08:46:01 +0000 Subject: [PATCH 07/19] ci: apply automated fixes --- packages/db/src/query/compiler/index.ts | 24 +-- packages/db/src/query/compiler/order-by.ts | 25 +-- .../query/live/collection-config-builder.ts | 26 +-- packages/db/tests/query/includes.test.ts | 177 ++++++++---------- 4 files changed, 118 insertions(+), 134 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index a78a86ba6..d749e6e3a 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -1,9 +1,4 @@ -import { - distinct, - filter, - join as joinOperator, - map, -} from '@tanstack/db-ivm' +import { distinct, filter, join as joinOperator, map } from '@tanstack/db-ivm' import { optimizeQuery } from '../optimizer.js' import { CollectionInputNotFoundError, @@ -203,9 +198,7 @@ export function compileQuery( ) // Inner join: only children whose correlation key exists in parent keys pass through - const joined = childRekeyed.pipe( - joinOperator(parentKeyStream, `inner`), - ) + const joined = childRekeyed.pipe(joinOperator(parentKeyStream, `inner`)) // Extract: [correlationValue, [[childKey, childRow], null]] → [childKey, childRow] // Tag the row with __correlationKey for output routing @@ -323,7 +316,9 @@ export function compileQuery( fieldName: subquery.fieldName, correlationField: subquery.correlationField, childCorrelationField: subquery.childCorrelationField, - hasOrderBy: !!(subquery.query.orderBy && subquery.query.orderBy.length > 0), + hasOrderBy: !!( + subquery.query.orderBy && subquery.query.orderBy.length > 0 + ), childCompilationResult: childResult, }) @@ -438,7 +433,8 @@ export function compileQuery( // When in includes mode with limit/offset, use grouped ordering so that // the limit is applied per parent (per correlation key), not globally. const includesGroupKeyFn = - parentKeyStream && (query.limit !== undefined || query.offset !== undefined) + parentKeyStream && + (query.limit !== undefined || query.offset !== undefined) ? (_key: unknown, row: unknown) => (row as any)?.[mainSource]?.__correlationKey : undefined @@ -479,8 +475,7 @@ export function compileQuery( sourceWhereClauses, aliasToCollectionId, aliasRemapping, - includes: - includesResults.length > 0 ? includesResults : undefined, + includes: includesResults.length > 0 ? includesResults : undefined, } cache.set(rawQuery, compilationResult) @@ -516,8 +511,7 @@ export function compileQuery( sourceWhereClauses, aliasToCollectionId, aliasRemapping, - includes: - includesResults.length > 0 ? includesResults : undefined, + includes: includesResults.length > 0 ? includesResults : undefined, } cache.set(rawQuery, compilationResult) diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 2107413b1..6ed5f958c 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -1,4 +1,7 @@ -import { groupedOrderByWithFractionalIndex, orderByWithFractionalIndex } from '@tanstack/db-ivm' +import { + groupedOrderByWithFractionalIndex, + orderByWithFractionalIndex, +} from '@tanstack/db-ivm' import { defaultComparator, makeComparator } from '../../utils/comparison.js' import { PropRef, followRef } from '../ir.js' import { ensureIndexForField } from '../../indexes/auto-index.js' @@ -306,17 +309,15 @@ export function processOrderBy( setWindowFn: ( windowFn: (options: { offset?: number; limit?: number }) => void, ) => { - setWindowFn( - (options) => { - windowFn(options) - if (orderByOptimizationInfo) { - orderByOptimizationInfo.offset = - options.offset ?? orderByOptimizationInfo.offset - orderByOptimizationInfo.limit = - options.limit ?? orderByOptimizationInfo.limit - } - }, - ) + setWindowFn((options) => { + windowFn(options) + if (orderByOptimizationInfo) { + orderByOptimizationInfo.offset = + options.offset ?? orderByOptimizationInfo.offset + orderByOptimizationInfo.limit = + options.limit ?? orderByOptimizationInfo.limit + } + }) }, }), ) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 10e3c041f..fac585829 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -731,7 +731,10 @@ export class CollectionConfigBuilder< ) // Set up includes output routing and child collection lifecycle - const includesState = this.setupIncludesOutput(this.includesCache, syncState) + const includesState = this.setupIncludesOutput( + this.includesCache, + syncState, + ) // Flush pending changes and reset the accumulator. // Called at the end of each graph run to commit all accumulated changes. @@ -829,10 +832,7 @@ export class CollectionConfigBuilder< const messages = data.getInner() syncState.messagesCount += messages.length - for (const [ - [childKey, tupleData], - multiplicity, - ] of messages) { + for (const [[childKey, tupleData], multiplicity] of messages) { const [childResult, _orderByIndex, correlationKey] = tupleData as unknown as [any, string | undefined, unknown] @@ -1346,7 +1346,14 @@ function createChildCollectionEntry( startSync: true, }) - return { collection, get syncMethods() { return syncMethods }, resultKeys, orderByIndices } + return { + collection, + get syncMethods() { + return syncMethods + }, + resultKeys, + orderByIndices, + } } /** @@ -1396,10 +1403,7 @@ function flushIncludesState( // Flush child changes: route to correct child Collections if (state.pendingChildChanges.size > 0) { - for (const [ - correlationKey, - childChanges, - ] of state.pendingChildChanges) { + for (const [correlationKey, childChanges] of state.pendingChildChanges) { // Ensure child Collection exists for this correlation key let entry = state.childRegistry.get(correlationKey) if (!entry) { @@ -1509,7 +1513,7 @@ function attachChildCollectionToParent( if (value === correlationKey) { // Set the child Collection on this parent row - ;(item)[fieldName] = childCollection + item[fieldName] = childCollection } } } diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index 248c15ecd..8d95b65f9 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -113,19 +113,17 @@ describe(`includes subqueries`, () => { function buildIncludesQuery() { return createLiveQueryCollection((q) => - q - .from({ p: projects }) - .select(({ p }) => ({ - id: p.id, - name: p.name, - issues: q - .from({ i: issues }) - .where(({ i }) => eq(i.projectId, p.id)) - .select(({ i }) => ({ - id: i.id, - title: i.title, - })), - })), + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), ) } @@ -155,8 +153,7 @@ describe(`includes subqueries`, () => { }, ]) }) - -}) + }) describe(`reactivity`, () => { it(`adding a child updates the parent's child collection`, async () => { @@ -299,20 +296,18 @@ describe(`includes subqueries`, () => { describe(`ordered child queries`, () => { it(`child collection respects orderBy on the child query`, async () => { const collection = createLiveQueryCollection((q) => - q - .from({ p: projects }) - .select(({ p }) => ({ - id: p.id, - name: p.name, - issues: q - .from({ i: issues }) - .where(({ i }) => eq(i.projectId, p.id)) - .orderBy(({ i }) => i.title, `desc`) - .select(({ i }) => ({ - id: i.id, - title: i.title, - })), - })), + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `desc`) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), ) await collection.preload() @@ -338,20 +333,18 @@ describe(`includes subqueries`, () => { it(`newly inserted children appear in the correct order`, async () => { const collection = createLiveQueryCollection((q) => - q - .from({ p: projects }) - .select(({ p }) => ({ - id: p.id, - name: p.name, - issues: q - .from({ i: issues }) - .where(({ i }) => eq(i.projectId, p.id)) - .orderBy(({ i }) => i.title, `asc`) - .select(({ i }) => ({ - id: i.id, - title: i.title, - })), - })), + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), ) await collection.preload() @@ -382,21 +375,19 @@ describe(`includes subqueries`, () => { describe(`ordered child queries with limit`, () => { it(`limits child collection to N items per parent`, async () => { const collection = createLiveQueryCollection((q) => - q - .from({ p: projects }) - .select(({ p }) => ({ - id: p.id, - name: p.name, - issues: q - .from({ i: issues }) - .where(({ i }) => eq(i.projectId, p.id)) - .orderBy(({ i }) => i.title, `asc`) - .limit(1) - .select(({ i }) => ({ - id: i.id, - title: i.title, - })), - })), + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .limit(1) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), ) await collection.preload() @@ -420,21 +411,19 @@ describe(`includes subqueries`, () => { it(`inserting a child that displaces an existing one respects the limit`, async () => { const collection = createLiveQueryCollection((q) => - q - .from({ p: projects }) - .select(({ p }) => ({ - id: p.id, - name: p.name, - issues: q - .from({ i: issues }) - .where(({ i }) => eq(i.projectId, p.id)) - .orderBy(({ i }) => i.title, `asc`) - .limit(1) - .select(({ i }) => ({ - id: i.id, - title: i.title, - })), - })), + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .limit(1) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), ) await collection.preload() @@ -442,9 +431,7 @@ describe(`includes subqueries`, () => { // Alpha should have exactly 1 issue (limit 1): "Bug in Alpha" const alphaIssues = [...(collection.get(1) as any).issues.toArray] expect(alphaIssues).toHaveLength(1) - expect(alphaIssues).toEqual([ - { id: 10, title: `Bug in Alpha` }, - ]) + expect(alphaIssues).toEqual([{ id: 10, title: `Bug in Alpha` }]) // Insert an issue that comes before "Bug" alphabetically issues.utils.begin() @@ -469,26 +456,24 @@ describe(`includes subqueries`, () => { describe(`nested includes`, () => { it(`supports two levels of includes`, async () => { const collection = createLiveQueryCollection((q) => - q - .from({ p: projects }) - .select(({ p }) => ({ - id: p.id, - name: p.name, - issues: q - .from({ i: issues }) - .where(({ i }) => eq(i.projectId, p.id)) - .select(({ i }) => ({ - id: i.id, - title: i.title, - comments: q - .from({ c: comments }) - .where(({ c }) => eq(c.issueId, i.id)) - .select(({ c }) => ({ - id: c.id, - body: c.body, - })), - })), - })), + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + comments: q + .from({ c: comments }) + .where(({ c }) => eq(c.issueId, i.id)) + .select(({ c }) => ({ + id: c.id, + body: c.body, + })), + })), + })), ) await collection.preload() From d117feaf4e82372d564d328ff5070267c9d01723 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 25 Feb 2026 10:44:20 +0100 Subject: [PATCH 08/19] Add changeset for includes subqueries Co-Authored-By: Claude Opus 4.6 --- .changeset/includes-subqueries.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/includes-subqueries.md diff --git a/.changeset/includes-subqueries.md b/.changeset/includes-subqueries.md new file mode 100644 index 000000000..caa9f44d4 --- /dev/null +++ b/.changeset/includes-subqueries.md @@ -0,0 +1,5 @@ +--- +'@tanstack/db': minor +--- + +feat: support for subqueries for including hierarchical data in live queries From e210f2365800331ddb99514b3c0af004697bacdd Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 25 Feb 2026 11:16:07 +0100 Subject: [PATCH 09/19] Use reverse index for parent lookup in includes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace O(n) parent collection scans with a reverse index (correlationKey → Set) for attaching child Collections to parent rows. The index is populated during parent INSERTs and cleaned up on parent DELETEs. Co-Authored-By: Claude Opus 4.6 --- .../query/live/collection-config-builder.ts | 57 +++++++++++-------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index fac585829..5ac2e52ad 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -824,6 +824,7 @@ export class CollectionConfigBuilder< hasOrderBy: entry.hasOrderBy, childRegistry: new Map(), pendingChildChanges: new Map(), + correlationToParentKeys: new Map(), } // Attach output callback on the child pipeline @@ -1301,6 +1302,8 @@ type IncludesOutputState = { childRegistry: Map /** Pending child changes: correlationKey → Map */ pendingChildChanges: Map>> + /** Reverse index: correlation key → Set of parent collection keys */ + correlationToParentKeys: Map> /** Nested includes state (for projects → issues → comments) */ nestedIncludesState?: Array } @@ -1372,7 +1375,7 @@ function flushIncludesState( // even those with no children (produces an empty child Collection). if (parentChanges) { const fieldPath = state.correlationField.path.slice(1) // remove alias prefix - for (const [_key, changes] of parentChanges) { + for (const [parentKey, changes] of parentChanges) { if (changes.inserts > 0) { const parentResult = changes.value // Extract the correlation key value from the parent result @@ -1393,6 +1396,14 @@ function flushIncludesState( ) state.childRegistry.set(correlationKey, entry) } + // Update reverse index: correlation key → parent keys + let parentKeys = state.correlationToParentKeys.get(correlationKey) + if (!parentKeys) { + parentKeys = new Set() + state.correlationToParentKeys.set(correlationKey, parentKeys) + } + parentKeys.add(parentKey) + // Attach child Collection to the parent result parentResult[state.fieldName] = state.childRegistry.get(correlationKey)!.collection @@ -1417,12 +1428,11 @@ function flushIncludesState( } // Attach the child Collection to ANY parent that has this correlation key - // by scanning the parent result collection attachChildCollectionToParent( parentCollection, state.fieldName, correlationKey, - state.correlationField, + state.correlationToParentKeys, entry.collection, ) @@ -1464,11 +1474,11 @@ function flushIncludesState( state.pendingChildChanges.clear() } - // For parent DELETEs: dispose child Collections so re-added parents - // get a fresh empty child Collection instead of reusing stale data. + // For parent DELETEs: dispose child Collections and clean up reverse index + // so re-added parents get a fresh empty child Collection instead of reusing stale data. if (parentChanges) { const fieldPath = state.correlationField.path.slice(1) - for (const [_key, changes] of parentChanges) { + for (const [parentKey, changes] of parentChanges) { if (changes.deletes > 0 && changes.inserts === 0) { let correlationKey: unknown = changes.value for (const segment of fieldPath) { @@ -1477,6 +1487,15 @@ function flushIncludesState( } if (correlationKey != null) { state.childRegistry.delete(correlationKey) + // Clean up reverse index + const parentKeys = + state.correlationToParentKeys.get(correlationKey) + if (parentKeys) { + parentKeys.delete(parentKey) + if (parentKeys.size === 0) { + state.correlationToParentKeys.delete(correlationKey) + } + } } } } @@ -1486,33 +1505,21 @@ function flushIncludesState( /** * Attaches a child Collection to parent rows that match a given correlation key. - * Scans the parent collection to find matching parents and sets the field. + * Uses the reverse index to look up parent keys directly instead of scanning. */ function attachChildCollectionToParent( parentCollection: Collection, fieldName: string, correlationKey: unknown, - correlationField: PropRef, + correlationToParentKeys: Map>, childCollection: Collection, ): void { - // Walk the parent collection's items to find those matching this correlation key - // The correlation field path has the alias prefix (e.g., ['project', 'id']), - // but at this point the parent result is the selected object, not namespaced. - // We need to find parents by their correlation value. - // Since the parent correlation field is e.g. project.id, and the selected result - // might have 'id' as a field, we use the correlation field path (minus alias). - const fieldPath = correlationField.path.slice(1) // remove alias prefix - - for (const [_key, item] of parentCollection) { - // Navigate to the correlation value on the parent result - let value: any = item - for (const segment of fieldPath) { - if (value == null) break - value = value[segment] - } + const parentKeys = correlationToParentKeys.get(correlationKey) + if (!parentKeys) return - if (value === correlationKey) { - // Set the child Collection on this parent row + for (const parentKey of parentKeys) { + const item = parentCollection.get(parentKey as any) + if (item) { item[fieldName] = childCollection } } From 54830d4639822a453c7e122bdbe775358aba1d03 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 25 Feb 2026 10:17:06 +0000 Subject: [PATCH 10/19] ci: apply automated fixes --- packages/db/src/query/live/collection-config-builder.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 5ac2e52ad..e9fe21627 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1488,8 +1488,7 @@ function flushIncludesState( if (correlationKey != null) { state.childRegistry.delete(correlationKey) // Clean up reverse index - const parentKeys = - state.correlationToParentKeys.get(correlationKey) + const parentKeys = state.correlationToParentKeys.get(correlationKey) if (parentKeys) { parentKeys.delete(parentKey) if (parentKeys.size === 0) { From d3249418d7f75bfa31b9f492707fd96a7733e721 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 25 Feb 2026 13:20:13 +0100 Subject: [PATCH 11/19] Unit tests for changes to deeply nested collections --- packages/db/tests/query/includes.test.ts | 52 +++++++++++++++++++++++- 1 file changed, 50 insertions(+), 2 deletions(-) diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index 8d95b65f9..07a76a5c8 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -454,8 +454,8 @@ describe(`includes subqueries`, () => { }) describe(`nested includes`, () => { - it(`supports two levels of includes`, async () => { - const collection = createLiveQueryCollection((q) => + function buildNestedQuery() { + return createLiveQueryCollection((q) => q.from({ p: projects }).select(({ p }) => ({ id: p.id, name: p.name, @@ -475,7 +475,10 @@ describe(`includes subqueries`, () => { })), })), ) + } + it(`supports two levels of includes`, async () => { + const collection = buildNestedQuery() await collection.preload() expect(toTree(collection)).toEqual([ @@ -516,5 +519,50 @@ describe(`includes subqueries`, () => { }, ]) }) + + it(`adding a grandchild (comment) updates the nested child collection`, async () => { + const collection = buildNestedQuery() + await collection.preload() + + // Issue 11 (Feature for Alpha) has no comments initially + const alpha = collection.get(1) as any + const issue11 = alpha.issues.get(11) + expect(childItems(issue11.comments)).toEqual([]) + + // Add a comment to issue 11 — no issue or project changes + comments.utils.begin() + comments.utils.write({ + type: `insert`, + value: { id: 110, issueId: 11, body: `Great feature` }, + }) + comments.utils.commit() + + const issue11After = (collection.get(1) as any).issues.get(11) + expect(childItems(issue11After.comments)).toEqual([ + { id: 110, body: `Great feature` }, + ]) + }) + + it(`removing a grandchild (comment) updates the nested child collection`, async () => { + const collection = buildNestedQuery() + await collection.preload() + + // Issue 10 (Bug in Alpha) has 2 comments + const issue10 = (collection.get(1) as any).issues.get(10) + expect(childItems(issue10.comments)).toHaveLength(2) + + // Remove one comment + comments.utils.begin() + comments.utils.write({ + type: `delete`, + value: sampleComments.find((c) => c.id === 100)!, + }) + comments.utils.commit() + + const issue10After = (collection.get(1) as any).issues.get(10) + expect(childItems(issue10After.comments)).toEqual([ + { id: 101, body: `Fixed it` }, + ]) + }) }) }) From f8095265225ed545b95578633dedad0e50be3719 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Wed, 25 Feb 2026 13:20:55 +0100 Subject: [PATCH 12/19] Move fro top-down to bottom-up approach for flushing changes to nested collections. --- .../query/live/collection-config-builder.ts | 364 ++++++++++++++++-- 1 file changed, 339 insertions(+), 25 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index e9fe21627..4c6976350 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -740,9 +740,7 @@ export class CollectionConfigBuilder< // Called at the end of each graph run to commit all accumulated changes. syncState.flushPendingChanges = () => { const hasParentChanges = pendingChanges.size > 0 - const hasChildChanges = includesState.some( - (s) => s.pendingChildChanges.size > 0, - ) + const hasChildChanges = hasPendingIncludesChanges(includesState) if (!hasParentChanges && !hasChildChanges) { return @@ -863,12 +861,14 @@ export class CollectionConfigBuilder< }), ) - // Recursively set up nested includes (e.g., comments inside issues) + // Set up shared buffers for nested includes (e.g., comments inside issues) if (entry.childCompilationResult.includes) { - state.nestedIncludesState = this.setupIncludesOutput( + state.nestedSetups = setupNestedPipelines( entry.childCompilationResult.includes, syncState, ) + state.nestedRoutingIndex = new Map() + state.nestedRoutingReverseIndex = new Map() } return state @@ -1289,6 +1289,19 @@ function isNestedSelectObject(obj: any): boolean { return true } +/** + * Shared buffer setup for a single nested includes level. + * Pipeline output writes into the buffer; during flush the buffer is drained + * into per-entry states via the routing index. + */ +type NestedIncludesSetup = { + compilationResult: IncludesCompilationResult + /** Shared buffer: nestedCorrelationKey → Map */ + buffer: Map>> + /** For 3+ levels of nesting */ + nestedSetups?: Array +} + /** * State tracked per includes entry for output routing and child lifecycle */ @@ -1304,8 +1317,12 @@ type IncludesOutputState = { pendingChildChanges: Map>> /** Reverse index: correlation key → Set of parent collection keys */ correlationToParentKeys: Map> - /** Nested includes state (for projects → issues → comments) */ - nestedIncludesState?: Array + /** Shared nested pipeline setups (one per nested includes level) */ + nestedSetups?: Array + /** nestedCorrelationKey → parentCorrelationKey */ + nestedRoutingIndex?: Map + /** parentCorrelationKey → Set */ + nestedRoutingReverseIndex?: Map> } type ChildCollectionEntry = { @@ -1313,6 +1330,250 @@ type ChildCollectionEntry = { syncMethods: SyncMethods | null resultKeys: WeakMap orderByIndices: WeakMap | null + /** Per-entry nested includes states (one per nested includes level) */ + includesStates?: Array +} + +/** + * Sets up shared buffers for nested includes pipelines. + * Instead of writing directly into a single shared IncludesOutputState, + * each nested pipeline writes into a buffer that is later drained per-entry. + */ +function setupNestedPipelines( + includes: Array, + syncState: SyncState, +): Array { + return includes.map((entry) => { + const buffer: Map>> = new Map() + + // Attach output callback that writes into the shared buffer + entry.pipeline.pipe( + output((data) => { + const messages = data.getInner() + syncState.messagesCount += messages.length + + for (const [[childKey, tupleData], multiplicity] of messages) { + const [childResult, _orderByIndex, correlationKey] = + tupleData as unknown as [any, string | undefined, unknown] + + let byChild = buffer.get(correlationKey) + if (!byChild) { + byChild = new Map() + buffer.set(correlationKey, byChild) + } + + const existing = byChild.get(childKey) || { + deletes: 0, + inserts: 0, + value: childResult, + orderByIndex: _orderByIndex, + } + + if (multiplicity < 0) { + existing.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + existing.inserts += multiplicity + existing.value = childResult + } + + byChild.set(childKey, existing) + } + }), + ) + + const setup: NestedIncludesSetup = { + compilationResult: entry, + buffer, + } + + // Recursively set up deeper levels + if (entry.childCompilationResult.includes) { + setup.nestedSetups = setupNestedPipelines( + entry.childCompilationResult.includes, + syncState, + ) + } + + return setup + }) +} + +/** + * Creates fresh per-entry IncludesOutputState array from NestedIncludesSetup array. + * Each entry gets its own isolated state for nested includes. + */ +function createPerEntryIncludesStates( + setups: Array, +): Array { + return setups.map((setup) => { + const state: IncludesOutputState = { + fieldName: setup.compilationResult.fieldName, + correlationField: setup.compilationResult.correlationField, + childCorrelationField: setup.compilationResult.childCorrelationField, + hasOrderBy: setup.compilationResult.hasOrderBy, + childRegistry: new Map(), + pendingChildChanges: new Map(), + correlationToParentKeys: new Map(), + } + + if (setup.nestedSetups) { + state.nestedSetups = setup.nestedSetups + state.nestedRoutingIndex = new Map() + state.nestedRoutingReverseIndex = new Map() + } + + return state + }) +} + +/** + * Drains shared buffers into per-entry states using the routing index. + * Returns the set of parent correlation keys that had changes routed to them. + */ +function drainNestedBuffers( + state: IncludesOutputState, +): Set { + const dirtyCorrelationKeys = new Set() + + if (!state.nestedSetups) return dirtyCorrelationKeys + + for (let i = 0; i < state.nestedSetups.length; i++) { + const setup = state.nestedSetups[i]! + const toDelete: Array = [] + + for (const [nestedCorrelationKey, childChanges] of setup.buffer) { + const parentCorrelationKey = state.nestedRoutingIndex!.get(nestedCorrelationKey) + if (parentCorrelationKey === undefined) { + // Unroutable — parent not yet seen; keep in buffer + continue + } + + const entry = state.childRegistry.get(parentCorrelationKey) + if (!entry || !entry.includesStates) { + continue + } + + // Route changes into this entry's per-entry state at position i + const entryState = entry.includesStates[i]! + for (const [childKey, changes] of childChanges) { + let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey) + if (!byChild) { + byChild = new Map() + entryState.pendingChildChanges.set(nestedCorrelationKey, byChild) + } + const existing = byChild.get(childKey) + if (existing) { + existing.inserts += changes.inserts + existing.deletes += changes.deletes + if (changes.inserts > 0) { + existing.value = changes.value + if (changes.orderByIndex !== undefined) { + existing.orderByIndex = changes.orderByIndex + } + } + } else { + byChild.set(childKey, { ...changes }) + } + } + + dirtyCorrelationKeys.add(parentCorrelationKey) + toDelete.push(nestedCorrelationKey) + } + + for (const key of toDelete) { + setup.buffer.delete(key) + } + } + + return dirtyCorrelationKeys +} + +/** + * Updates the routing index after processing child changes. + * Maps nested correlation keys to parent correlation keys so that + * grandchild changes can be routed to the correct per-entry state. + */ +function updateRoutingIndex( + state: IncludesOutputState, + correlationKey: unknown, + childChanges: Map>, +): void { + if (!state.nestedSetups) return + + for (const setup of state.nestedSetups) { + const nestedFieldPath = setup.compilationResult.correlationField.path.slice(1) + + for (const [, change] of childChanges) { + if (change.inserts > 0) { + // Extract nested correlation key from child result + let nestedCorrelationKey: unknown = change.value + for (const segment of nestedFieldPath) { + if (nestedCorrelationKey == null) break + nestedCorrelationKey = (nestedCorrelationKey as any)[segment] + } + + if (nestedCorrelationKey != null) { + state.nestedRoutingIndex!.set(nestedCorrelationKey, correlationKey) + let reverseSet = state.nestedRoutingReverseIndex!.get(correlationKey) + if (!reverseSet) { + reverseSet = new Set() + state.nestedRoutingReverseIndex!.set(correlationKey, reverseSet) + } + reverseSet.add(nestedCorrelationKey) + } + } else if (change.deletes > 0 && change.inserts === 0) { + // Remove from routing index + let nestedCorrelationKey: unknown = change.value + for (const segment of nestedFieldPath) { + if (nestedCorrelationKey == null) break + nestedCorrelationKey = (nestedCorrelationKey as any)[segment] + } + + if (nestedCorrelationKey != null) { + state.nestedRoutingIndex!.delete(nestedCorrelationKey) + const reverseSet = state.nestedRoutingReverseIndex!.get(correlationKey) + if (reverseSet) { + reverseSet.delete(nestedCorrelationKey) + if (reverseSet.size === 0) { + state.nestedRoutingReverseIndex!.delete(correlationKey) + } + } + } + } + } + } +} + +/** + * Cleans routing index entries when a parent is deleted. + * Uses the reverse index to find and remove all nested routing entries. + */ +function cleanRoutingIndexOnDelete( + state: IncludesOutputState, + correlationKey: unknown, +): void { + if (!state.nestedRoutingReverseIndex) return + + const nestedKeys = state.nestedRoutingReverseIndex.get(correlationKey) + if (nestedKeys) { + for (const nestedKey of nestedKeys) { + state.nestedRoutingIndex!.delete(nestedKey) + } + state.nestedRoutingReverseIndex.delete(correlationKey) + } +} + +/** + * Recursively checks whether any nested buffer has pending changes. + */ +function hasNestedBufferChanges( + setups: Array, +): boolean { + for (const setup of setups) { + if (setup.buffer.size > 0) return true + if (setup.nestedSetups && hasNestedBufferChanges(setup.nestedSetups)) return true + } + return false } /** @@ -1324,6 +1585,7 @@ function createChildCollectionEntry( fieldName: string, correlationKey: unknown, hasOrderBy: boolean, + nestedSetups?: Array, ): ChildCollectionEntry { const resultKeys = new WeakMap() const orderByIndices = hasOrderBy ? new WeakMap() : null @@ -1349,7 +1611,7 @@ function createChildCollectionEntry( startSync: true, }) - return { + const entry: ChildCollectionEntry = { collection, get syncMethods() { return syncMethods @@ -1357,12 +1619,22 @@ function createChildCollectionEntry( resultKeys, orderByIndices, } + + if (nestedSetups) { + entry.includesStates = createPerEntryIncludesStates(nestedSetups) + } + + return entry } /** - * Recursively flushes includes state, processing child changes and creating - * child Collections. Handles nested includes (e.g., comments inside issues) - * by recursing into nested state after flushing each level. + * Flushes includes state using a bottom-up per-entry approach. + * Five phases ensure correct ordering: + * 1. Parent INSERTs — create child entries with per-entry nested states + * 2. Child changes — apply to child Collections, update routing index + * 3. Drain nested buffers — route buffered grandchild changes to per-entry states + * 4. Flush per-entry states — recursively flush nested includes on each entry + * 5. Parent DELETEs — clean up child entries and routing index */ function flushIncludesState( includesState: Array, @@ -1371,8 +1643,7 @@ function flushIncludesState( parentChanges: Map> | null, ): void { for (const state of includesState) { - // For parent INSERTs: ensure a child Collection exists for every parent, - // even those with no children (produces an empty child Collection). + // Phase 1: Parent INSERTs — ensure a child Collection exists for every parent if (parentChanges) { const fieldPath = state.correlationField.path.slice(1) // remove alias prefix for (const [parentKey, changes] of parentChanges) { @@ -1393,6 +1664,7 @@ function flushIncludesState( state.fieldName, correlationKey, state.hasOrderBy, + state.nestedSetups, ) state.childRegistry.set(correlationKey, entry) } @@ -1412,7 +1684,10 @@ function flushIncludesState( } } - // Flush child changes: route to correct child Collections + // Phase 2: Child changes — apply to child Collections + // Track which entries had child changes and capture their childChanges maps + const entriesWithChildChanges = new Map> }>() + if (state.pendingChildChanges.size > 0) { for (const [correlationKey, childChanges] of state.pendingChildChanges) { // Ensure child Collection exists for this correlation key @@ -1423,6 +1698,7 @@ function flushIncludesState( state.fieldName, correlationKey, state.hasOrderBy, + state.nestedSetups, ) state.childRegistry.set(correlationKey, entry) } @@ -1461,21 +1737,44 @@ function flushIncludesState( entry.syncMethods.commit() } - // Recursively process nested includes (e.g., comments inside issues) - if (state.nestedIncludesState) { - flushIncludesState( - state.nestedIncludesState, - entry.collection, - entry.collection.id, - childChanges, - ) - } + // Update routing index for nested includes + updateRoutingIndex(state, correlationKey, childChanges) + + entriesWithChildChanges.set(correlationKey, { entry, childChanges }) } state.pendingChildChanges.clear() } - // For parent DELETEs: dispose child Collections and clean up reverse index - // so re-added parents get a fresh empty child Collection instead of reusing stale data. + // Phase 3: Drain nested buffers — route buffered grandchild changes to per-entry states + const dirtyFromBuffers = drainNestedBuffers(state) + + // Phase 4: Flush per-entry states + // First: entries that had child changes in Phase 2 + for (const [, { entry, childChanges }] of entriesWithChildChanges) { + if (entry.includesStates) { + flushIncludesState( + entry.includesStates, + entry.collection, + entry.collection.id, + childChanges, + ) + } + } + // Then: entries that only had buffer-routed changes (no child changes at this level) + for (const correlationKey of dirtyFromBuffers) { + if (entriesWithChildChanges.has(correlationKey)) continue + const entry = state.childRegistry.get(correlationKey) + if (entry?.includesStates) { + flushIncludesState( + entry.includesStates, + entry.collection, + entry.collection.id, + null, + ) + } + } + + // Phase 5: Parent DELETEs — dispose child Collections and clean up if (parentChanges) { const fieldPath = state.correlationField.path.slice(1) for (const [parentKey, changes] of parentChanges) { @@ -1486,6 +1785,7 @@ function flushIncludesState( correlationKey = (correlationKey as any)[segment] } if (correlationKey != null) { + cleanRoutingIndexOnDelete(state, correlationKey) state.childRegistry.delete(correlationKey) // Clean up reverse index const parentKeys = state.correlationToParentKeys.get(correlationKey) @@ -1502,6 +1802,20 @@ function flushIncludesState( } } +/** + * Checks whether any includes state has pending changes that need to be flushed. + * Checks direct pending child changes and shared nested buffers. + */ +function hasPendingIncludesChanges( + states: Array, +): boolean { + for (const state of states) { + if (state.pendingChildChanges.size > 0) return true + if (state.nestedSetups && hasNestedBufferChanges(state.nestedSetups)) return true + } + return false +} + /** * Attaches a child Collection to parent rows that match a given correlation key. * Uses the reverse index to look up parent keys directly instead of scanning. From 15b98621076a4fb5031d330c7c984f56e63c5746 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 25 Feb 2026 12:25:15 +0000 Subject: [PATCH 13/19] ci: apply automated fixes --- .../query/live/collection-config-builder.ts | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 4c6976350..c2d7387d8 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1430,9 +1430,7 @@ function createPerEntryIncludesStates( * Drains shared buffers into per-entry states using the routing index. * Returns the set of parent correlation keys that had changes routed to them. */ -function drainNestedBuffers( - state: IncludesOutputState, -): Set { +function drainNestedBuffers(state: IncludesOutputState): Set { const dirtyCorrelationKeys = new Set() if (!state.nestedSetups) return dirtyCorrelationKeys @@ -1442,7 +1440,8 @@ function drainNestedBuffers( const toDelete: Array = [] for (const [nestedCorrelationKey, childChanges] of setup.buffer) { - const parentCorrelationKey = state.nestedRoutingIndex!.get(nestedCorrelationKey) + const parentCorrelationKey = + state.nestedRoutingIndex!.get(nestedCorrelationKey) if (parentCorrelationKey === undefined) { // Unroutable — parent not yet seen; keep in buffer continue @@ -1501,7 +1500,8 @@ function updateRoutingIndex( if (!state.nestedSetups) return for (const setup of state.nestedSetups) { - const nestedFieldPath = setup.compilationResult.correlationField.path.slice(1) + const nestedFieldPath = + setup.compilationResult.correlationField.path.slice(1) for (const [, change] of childChanges) { if (change.inserts > 0) { @@ -1531,7 +1531,8 @@ function updateRoutingIndex( if (nestedCorrelationKey != null) { state.nestedRoutingIndex!.delete(nestedCorrelationKey) - const reverseSet = state.nestedRoutingReverseIndex!.get(correlationKey) + const reverseSet = + state.nestedRoutingReverseIndex!.get(correlationKey) if (reverseSet) { reverseSet.delete(nestedCorrelationKey) if (reverseSet.size === 0) { @@ -1566,12 +1567,11 @@ function cleanRoutingIndexOnDelete( /** * Recursively checks whether any nested buffer has pending changes. */ -function hasNestedBufferChanges( - setups: Array, -): boolean { +function hasNestedBufferChanges(setups: Array): boolean { for (const setup of setups) { if (setup.buffer.size > 0) return true - if (setup.nestedSetups && hasNestedBufferChanges(setup.nestedSetups)) return true + if (setup.nestedSetups && hasNestedBufferChanges(setup.nestedSetups)) + return true } return false } @@ -1686,7 +1686,10 @@ function flushIncludesState( // Phase 2: Child changes — apply to child Collections // Track which entries had child changes and capture their childChanges maps - const entriesWithChildChanges = new Map> }>() + const entriesWithChildChanges = new Map< + unknown, + { entry: ChildCollectionEntry; childChanges: Map> } + >() if (state.pendingChildChanges.size > 0) { for (const [correlationKey, childChanges] of state.pendingChildChanges) { @@ -1811,7 +1814,8 @@ function hasPendingIncludesChanges( ): boolean { for (const state of states) { if (state.pendingChildChanges.size > 0) return true - if (state.nestedSetups && hasNestedBufferChanges(state.nestedSetups)) return true + if (state.nestedSetups && hasNestedBufferChanges(state.nestedSetups)) + return true } return false } From ec21dbf81f6d79d6cf028ea4e9318fe2ae5a8321 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 26 Feb 2026 14:45:52 +0100 Subject: [PATCH 14/19] Prefix child collection names to avoid clashes --- packages/db/src/query/live/collection-config-builder.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index c2d7387d8..933cb7ced 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1596,7 +1596,7 @@ function createChildCollectionEntry( : undefined const collection = createCollection({ - id: `${parentId}-${fieldName}-${String(correlationKey)}`, + id: `__child-collection:${parentId}-${fieldName}-${String(correlationKey)}`, getKey: (item: any) => resultKeys.get(item) as string | number, compare, sync: { From 3d74cf5b29d7c18cb002dd9d40d21e37804c58b5 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 26 Feb 2026 15:30:21 +0100 Subject: [PATCH 15/19] Properly serialize correlation key before using it in collection ID --- packages/db/src/query/live/collection-config-builder.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 933cb7ced..a1a2d30ed 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,4 +1,4 @@ -import { D2, output } from '@tanstack/db-ivm' +import { D2, output, serializeValue } from '@tanstack/db-ivm' import { compileQuery } from '../compiler/index.js' import { createCollection } from '../../collection/index.js' import { IncludesSubquery } from '../ir.js' @@ -1596,7 +1596,7 @@ function createChildCollectionEntry( : undefined const collection = createCollection({ - id: `__child-collection:${parentId}-${fieldName}-${String(correlationKey)}`, + id: `__child-collection:${parentId}-${fieldName}-${serializeValue(correlationKey)}`, getKey: (item: any) => resultKeys.get(item) as string | number, compare, sync: { From f5909177701af6619ded13ad49f70949ab16f057 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 26 Feb 2026 15:46:51 +0100 Subject: [PATCH 16/19] Additional test as suggested by Codex review --- packages/db/tests/query/includes.test.ts | 136 +++++++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index 07a76a5c8..ff37e863d 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -453,6 +453,142 @@ describe(`includes subqueries`, () => { }) }) + describe(`shared correlation key`, () => { + // Multiple parents share the same correlationKey value. + // e.g., two teams in the same department — both should see the same department members. + type Team = { id: number; name: string; departmentId: number } + type Member = { id: number; departmentId: number; name: string } + + const sampleTeams: Array = [ + { id: 1, name: `Frontend`, departmentId: 100 }, + { id: 2, name: `Backend`, departmentId: 100 }, + { id: 3, name: `Marketing`, departmentId: 200 }, + ] + + const sampleMembers: Array = [ + { id: 10, departmentId: 100, name: `Alice` }, + { id: 11, departmentId: 100, name: `Bob` }, + { id: 20, departmentId: 200, name: `Charlie` }, + ] + + function createTeamsCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-teams`, + getKey: (t) => t.id, + initialData: sampleTeams, + }), + ) + } + + function createMembersCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-members`, + getKey: (m) => m.id, + initialData: sampleMembers, + }), + ) + } + + it(`multiple parents with the same correlationKey each get the shared children`, async () => { + const teams = createTeamsCollection() + const members = createMembersCollection() + + const collection = createLiveQueryCollection((q) => + q.from({ t: teams }).select(({ t }) => ({ + id: t.id, + name: t.name, + departmentId: t.departmentId, + members: q + .from({ m: members }) + .where(({ m }) => eq(m.departmentId, t.departmentId)) + .select(({ m }) => ({ + id: m.id, + name: m.name, + })), + })), + ) + + await collection.preload() + + // Both Frontend and Backend teams share departmentId 100 + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Frontend`, + departmentId: 100, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 2, + name: `Backend`, + departmentId: 100, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 3, + name: `Marketing`, + departmentId: 200, + members: [{ id: 20, name: `Charlie` }], + }, + ]) + }) + + it(`adding a child updates all parents that share the correlation key`, async () => { + const teams = createTeamsCollection() + const members = createMembersCollection() + + const collection = createLiveQueryCollection((q) => + q.from({ t: teams }).select(({ t }) => ({ + id: t.id, + name: t.name, + departmentId: t.departmentId, + members: q + .from({ m: members }) + .where(({ m }) => eq(m.departmentId, t.departmentId)) + .select(({ m }) => ({ + id: m.id, + name: m.name, + })), + })), + ) + + await collection.preload() + + // Add a new member to department 100 + members.utils.begin() + members.utils.write({ + type: `insert`, + value: { id: 12, departmentId: 100, name: `Dave` }, + }) + members.utils.commit() + + // Both Frontend and Backend should see the new member + expect(childItems((collection.get(1) as any).members)).toEqual([ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + { id: 12, name: `Dave` }, + ]) + expect(childItems((collection.get(2) as any).members)).toEqual([ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + { id: 12, name: `Dave` }, + ]) + + // Marketing unaffected + expect(childItems((collection.get(3) as any).members)).toEqual([ + { id: 20, name: `Charlie` }, + ]) + }) + }) + describe(`nested includes`, () => { function buildNestedQuery() { return createLiveQueryCollection((q) => From 3ea52ef495bfda4970eb6e2dd6db6c23fae52680 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 26 Feb 2026 15:51:10 +0100 Subject: [PATCH 17/19] Unit test to ensure that correlation field does not need to be in the parent select --- packages/db/tests/query/includes.test.ts | 46 ++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts index ff37e863d..221ff6b0c 100644 --- a/packages/db/tests/query/includes.test.ts +++ b/packages/db/tests/query/includes.test.ts @@ -587,6 +587,52 @@ describe(`includes subqueries`, () => { { id: 20, name: `Charlie` }, ]) }) + + it(`correlation field does not need to be in the parent select`, async () => { + const teams = createTeamsCollection() + const members = createMembersCollection() + + // departmentId is used for correlation but NOT selected in the parent output + const collection = createLiveQueryCollection((q) => + q.from({ t: teams }).select(({ t }) => ({ + id: t.id, + name: t.name, + members: q + .from({ m: members }) + .where(({ m }) => eq(m.departmentId, t.departmentId)) + .select(({ m }) => ({ + id: m.id, + name: m.name, + })), + })), + ) + + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Frontend`, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 2, + name: `Backend`, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 3, + name: `Marketing`, + members: [{ id: 20, name: `Charlie` }], + }, + ]) + }) }) describe(`nested includes`, () => { From 3ca70b9f0291cbd0a4e72304b981979d2262072a Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 26 Feb 2026 16:05:26 +0100 Subject: [PATCH 18/19] Stamp __includesCorrelationKeys on the result before output, and flushIncludesState reads from that stamp. The stamp is cleaned up at the end of flush so it never leaks to the user --- packages/db/src/query/compiler/index.ts | 29 +++++++++++ .../query/live/collection-config-builder.ts | 51 ++++++++----------- 2 files changed, 50 insertions(+), 30 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index d749e6e3a..434f10c04 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -325,6 +325,25 @@ export function compileQuery( // Replace includes entry in select with a null placeholder replaceIncludesInSelect(query.select, key) } + + // Stamp correlation key values onto the namespaced row so they survive + // select extraction. This allows flushIncludesState to read them directly + // without requiring the correlation field to be in the user's select. + if (includesEntries.length > 0) { + const compiledCorrelations = includesEntries.map(({ subquery }) => ({ + fieldName: subquery.fieldName, + compiled: compileExpression(subquery.correlationField), + })) + pipeline = pipeline.pipe( + map(([key, nsRow]: any) => { + const correlationKeys: Record = {} + for (const { fieldName: fn, compiled } of compiledCorrelations) { + correlationKeys[fn] = compiled(nsRow) + } + return [key, { ...nsRow, __includesCorrelationKeys: correlationKeys }] + }), + ) + } } if (query.distinct && !query.fnSelect && !query.select) { @@ -458,6 +477,11 @@ export function compileQuery( // Extract the final results from $selected and include orderBy index const raw = (row as any).$selected const finalResults = unwrapValue(raw) + // Stamp includes correlation keys onto the result for child routing + if ((row as any).__includesCorrelationKeys) { + finalResults.__includesCorrelationKeys = + (row as any).__includesCorrelationKeys + } // When in includes mode, embed the correlation key as third element if (parentKeyStream) { const correlationKey = (row as any)[mainSource]?.__correlationKey @@ -491,6 +515,11 @@ export function compileQuery( // Extract the final results from $selected and return [key, [results, undefined]] const raw = (row as any).$selected const finalResults = unwrapValue(raw) + // Stamp includes correlation keys onto the result for child routing + if ((row as any).__includesCorrelationKeys) { + finalResults.__includesCorrelationKeys = + (row as any).__includesCorrelationKeys + } // When in includes mode, embed the correlation key as third element if (parentKeyStream) { const correlationKey = (row as any)[mainSource]?.__correlationKey diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index a1a2d30ed..8abf7518a 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -817,7 +817,6 @@ export class CollectionConfigBuilder< return includesEntries.map((entry) => { const state: IncludesOutputState = { fieldName: entry.fieldName, - correlationField: entry.correlationField, childCorrelationField: entry.childCorrelationField, hasOrderBy: entry.hasOrderBy, childRegistry: new Map(), @@ -1307,7 +1306,6 @@ type NestedIncludesSetup = { */ type IncludesOutputState = { fieldName: string - correlationField: PropRef childCorrelationField: PropRef /** Whether the child query has an ORDER BY clause */ hasOrderBy: boolean @@ -1408,7 +1406,6 @@ function createPerEntryIncludesStates( return setups.map((setup) => { const state: IncludesOutputState = { fieldName: setup.compilationResult.fieldName, - correlationField: setup.compilationResult.correlationField, childCorrelationField: setup.compilationResult.childCorrelationField, hasOrderBy: setup.compilationResult.hasOrderBy, childRegistry: new Map(), @@ -1500,17 +1497,13 @@ function updateRoutingIndex( if (!state.nestedSetups) return for (const setup of state.nestedSetups) { - const nestedFieldPath = - setup.compilationResult.correlationField.path.slice(1) - for (const [, change] of childChanges) { if (change.inserts > 0) { - // Extract nested correlation key from child result - let nestedCorrelationKey: unknown = change.value - for (const segment of nestedFieldPath) { - if (nestedCorrelationKey == null) break - nestedCorrelationKey = (nestedCorrelationKey as any)[segment] - } + // Read the pre-computed nested correlation key from the compiler stamp + const nestedCorrelationKey = + (change.value).__includesCorrelationKeys?.[ + setup.compilationResult.fieldName + ] if (nestedCorrelationKey != null) { state.nestedRoutingIndex!.set(nestedCorrelationKey, correlationKey) @@ -1523,11 +1516,10 @@ function updateRoutingIndex( } } else if (change.deletes > 0 && change.inserts === 0) { // Remove from routing index - let nestedCorrelationKey: unknown = change.value - for (const segment of nestedFieldPath) { - if (nestedCorrelationKey == null) break - nestedCorrelationKey = (nestedCorrelationKey as any)[segment] - } + const nestedCorrelationKey = + (change.value).__includesCorrelationKeys?.[ + setup.compilationResult.fieldName + ] if (nestedCorrelationKey != null) { state.nestedRoutingIndex!.delete(nestedCorrelationKey) @@ -1645,16 +1637,12 @@ function flushIncludesState( for (const state of includesState) { // Phase 1: Parent INSERTs — ensure a child Collection exists for every parent if (parentChanges) { - const fieldPath = state.correlationField.path.slice(1) // remove alias prefix for (const [parentKey, changes] of parentChanges) { if (changes.inserts > 0) { const parentResult = changes.value - // Extract the correlation key value from the parent result - let correlationKey: unknown = parentResult - for (const segment of fieldPath) { - if (correlationKey == null) break - correlationKey = (correlationKey as any)[segment] - } + // Read the pre-computed correlation key from the compiler stamp + const correlationKey = + (parentResult).__includesCorrelationKeys?.[state.fieldName] if (correlationKey != null) { // Ensure child Collection exists for this correlation key @@ -1779,14 +1767,10 @@ function flushIncludesState( // Phase 5: Parent DELETEs — dispose child Collections and clean up if (parentChanges) { - const fieldPath = state.correlationField.path.slice(1) for (const [parentKey, changes] of parentChanges) { if (changes.deletes > 0 && changes.inserts === 0) { - let correlationKey: unknown = changes.value - for (const segment of fieldPath) { - if (correlationKey == null) break - correlationKey = (correlationKey as any)[segment] - } + const correlationKey = + (changes.value).__includesCorrelationKeys?.[state.fieldName] if (correlationKey != null) { cleanRoutingIndexOnDelete(state, correlationKey) state.childRegistry.delete(correlationKey) @@ -1803,6 +1787,13 @@ function flushIncludesState( } } } + + // Clean up the internal stamp from parent/child results so it doesn't leak to the user + if (parentChanges) { + for (const [, changes] of parentChanges) { + delete (changes.value).__includesCorrelationKeys + } + } } /** From fc269d5206f14f7bbf9058229c4acb7cd28e7ed4 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 15:06:38 +0000 Subject: [PATCH 19/19] ci: apply automated fixes --- packages/db/src/query/compiler/index.ts | 10 ++++++---- .../db/src/query/live/collection-config-builder.ts | 10 +++++----- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index 434f10c04..59c1246c3 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -479,8 +479,9 @@ export function compileQuery( const finalResults = unwrapValue(raw) // Stamp includes correlation keys onto the result for child routing if ((row as any).__includesCorrelationKeys) { - finalResults.__includesCorrelationKeys = - (row as any).__includesCorrelationKeys + finalResults.__includesCorrelationKeys = ( + row as any + ).__includesCorrelationKeys } // When in includes mode, embed the correlation key as third element if (parentKeyStream) { @@ -517,8 +518,9 @@ export function compileQuery( const finalResults = unwrapValue(raw) // Stamp includes correlation keys onto the result for child routing if ((row as any).__includesCorrelationKeys) { - finalResults.__includesCorrelationKeys = - (row as any).__includesCorrelationKeys + finalResults.__includesCorrelationKeys = ( + row as any + ).__includesCorrelationKeys } // When in includes mode, embed the correlation key as third element if (parentKeyStream) { diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 8abf7518a..d2fdcb7e6 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1501,7 +1501,7 @@ function updateRoutingIndex( if (change.inserts > 0) { // Read the pre-computed nested correlation key from the compiler stamp const nestedCorrelationKey = - (change.value).__includesCorrelationKeys?.[ + change.value.__includesCorrelationKeys?.[ setup.compilationResult.fieldName ] @@ -1517,7 +1517,7 @@ function updateRoutingIndex( } else if (change.deletes > 0 && change.inserts === 0) { // Remove from routing index const nestedCorrelationKey = - (change.value).__includesCorrelationKeys?.[ + change.value.__includesCorrelationKeys?.[ setup.compilationResult.fieldName ] @@ -1642,7 +1642,7 @@ function flushIncludesState( const parentResult = changes.value // Read the pre-computed correlation key from the compiler stamp const correlationKey = - (parentResult).__includesCorrelationKeys?.[state.fieldName] + parentResult.__includesCorrelationKeys?.[state.fieldName] if (correlationKey != null) { // Ensure child Collection exists for this correlation key @@ -1770,7 +1770,7 @@ function flushIncludesState( for (const [parentKey, changes] of parentChanges) { if (changes.deletes > 0 && changes.inserts === 0) { const correlationKey = - (changes.value).__includesCorrelationKeys?.[state.fieldName] + changes.value.__includesCorrelationKeys?.[state.fieldName] if (correlationKey != null) { cleanRoutingIndexOnDelete(state, correlationKey) state.childRegistry.delete(correlationKey) @@ -1791,7 +1791,7 @@ function flushIncludesState( // Clean up the internal stamp from parent/child results so it doesn't leak to the user if (parentChanges) { for (const [, changes] of parentChanges) { - delete (changes.value).__includesCorrelationKeys + delete changes.value.__includesCorrelationKeys } } }