diff --git a/.changeset/includes-subqueries.md b/.changeset/includes-subqueries.md new file mode 100644 index 000000000..caa9f44d4 --- /dev/null +++ b/.changeset/includes-subqueries.md @@ -0,0 +1,5 @@ +--- +'@tanstack/db': minor +--- + +feat: support for subqueries for including hierarchical data in live queries diff --git a/packages/db/src/query/builder/index.ts b/packages/db/src/query/builder/index.ts index c43f3ad48..7b7b6d3e8 100644 --- a/packages/db/src/query/builder/index.ts +++ b/packages/db/src/query/builder/index.ts @@ -3,6 +3,7 @@ import { Aggregate as AggregateExpr, CollectionRef, Func as FuncExpr, + IncludesSubquery, PropRef, QueryRef, Value as ValueExpr, @@ -476,7 +477,7 @@ export class BaseQueryBuilder { const aliases = this._getCurrentAliases() const refProxy = createRefProxy(aliases) as RefsForContext const selectObject = callback(refProxy) - const select = buildNestedSelect(selectObject) + const select = buildNestedSelect(selectObject, aliases) return new BaseQueryBuilder({ ...this.query, @@ -852,7 +853,7 @@ function isPlainObject(value: any): value is Record { ) } -function buildNestedSelect(obj: any): any { +function buildNestedSelect(obj: any, parentAliases: Array = []): any { if (!isPlainObject(obj)) return toExpr(obj) const out: Record = {} for (const [k, v] of Object.entries(obj)) { @@ -861,11 +862,126 @@ function buildNestedSelect(obj: any): any { out[k] = v continue } - out[k] = buildNestedSelect(v) + if (v instanceof BaseQueryBuilder) { + out[k] = buildIncludesSubquery(v, k, parentAliases) + continue + } + out[k] = buildNestedSelect(v, parentAliases) } return out } +/** + * Builds an IncludesSubquery IR node from a child query builder. + * Extracts the correlation condition from the child's WHERE clauses by finding + * an eq() predicate that references both a parent alias and a child alias. + */ +function buildIncludesSubquery( + childBuilder: BaseQueryBuilder, + fieldName: string, + parentAliases: Array, +): IncludesSubquery { + const childQuery = childBuilder._getQuery() + + // Collect child's own aliases + const childAliases: Array = [childQuery.from.alias] + if (childQuery.join) { + for (const j of childQuery.join) { + childAliases.push(j.from.alias) + } + } + + // Walk child's WHERE clauses to find the correlation condition + let parentRef: PropRef | undefined + let childRef: PropRef | undefined + let correlationWhereIndex = -1 + + if (childQuery.where) { + for (let i = 0; i < childQuery.where.length; i++) { + const where = childQuery.where[i]! + const expr = + typeof where === `object` && `expression` in where + ? where.expression + : where + + // Look for eq(a, b) where one side references parent and other references child + if ( + expr.type === `func` && + expr.name === `eq` && + expr.args.length === 2 + ) { + const [argA, argB] = expr.args + const result = extractCorrelation( + argA!, + argB!, + parentAliases, + childAliases, + ) + if (result) { + parentRef = result.parentRef + childRef = result.childRef + correlationWhereIndex = i + break + } + } + } + } + + if (!parentRef || !childRef || correlationWhereIndex === -1) { + throw new Error( + `Includes subquery for "${fieldName}" must have a WHERE clause with an eq() condition ` + + `that correlates a parent field with a child field. ` + + `Example: .where(({child}) => eq(child.parentId, parent.id))`, + ) + } + + // Remove the correlation WHERE from the child query + const modifiedWhere = [...childQuery.where!] + modifiedWhere.splice(correlationWhereIndex, 1) + const modifiedQuery: QueryIR = { + ...childQuery, + where: modifiedWhere.length > 0 ? modifiedWhere : undefined, + } + + return new IncludesSubquery(modifiedQuery, parentRef, childRef, fieldName) +} + +/** + * Checks if two eq() arguments form a parent-child correlation. + * Returns the parent and child PropRefs if found, undefined otherwise. + */ +function extractCorrelation( + argA: BasicExpression, + argB: BasicExpression, + parentAliases: Array, + childAliases: Array, +): { parentRef: PropRef; childRef: PropRef } | undefined { + if (argA.type === `ref` && argB.type === `ref`) { + const aAlias = argA.path[0] + const bAlias = argB.path[0] + + if ( + aAlias && + bAlias && + parentAliases.includes(aAlias) && + childAliases.includes(bAlias) + ) { + return { parentRef: argA, childRef: argB } + } + + if ( + aAlias && + bAlias && + parentAliases.includes(bAlias) && + childAliases.includes(aAlias) + ) { + return { parentRef: argB, childRef: argA } + } + } + + return undefined +} + // Internal function to build a query from a callback // used by liveQueryCollectionOptions.query export function buildQuery( diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 10d83a11b..595d277ae 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -519,7 +519,7 @@ function evaluateWrappedAggregates( * contain an Aggregate. Safely returns false for nested Select objects. */ export function containsAggregate( - expr: BasicExpression | Aggregate | Select, + expr: BasicExpression | Aggregate | Select | { type: string }, ): boolean { if (!isExpressionLike(expr)) { return false @@ -527,9 +527,9 @@ export function containsAggregate( if (expr.type === `agg`) { return true } - if (expr.type === `func`) { - return expr.args.some((arg: BasicExpression | Aggregate) => - containsAggregate(arg), + if (expr.type === `func` && `args` in expr) { + return (expr.args as Array).some( + (arg: BasicExpression | Aggregate) => containsAggregate(arg), ) } return false diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index d0d7469e2..59c1246c3 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -1,4 +1,4 @@ -import { distinct, filter, map } from '@tanstack/db-ivm' +import { distinct, filter, join as joinOperator, map } from '@tanstack/db-ivm' import { optimizeQuery } from '../optimizer.js' import { CollectionInputNotFoundError, @@ -8,7 +8,12 @@ import { LimitOffsetRequireOrderByError, UnsupportedFromTypeError, } from '../../errors.js' -import { PropRef, Value as ValClass, getWhereExpression } from '../ir.js' +import { + IncludesSubquery, + PropRef, + Value as ValClass, + getWhereExpression, +} from '../ir.js' import { compileExpression, toBooleanPredicate } from './evaluators.js' import { processJoins } from './joins.js' import { containsAggregate, processGroupBy } from './group-by.js' @@ -33,6 +38,25 @@ import type { QueryCache, QueryMapping, WindowOptions } from './types.js' export type { WindowOptions } from './types.js' +/** + * Result of compiling an includes subquery, including the child pipeline + * and metadata needed to route child results to parent-scoped Collections. + */ +export interface IncludesCompilationResult { + /** Filtered child pipeline (post inner-join with parent keys) */ + pipeline: ResultStream + /** Result field name on parent (e.g., "issues") */ + fieldName: string + /** Parent-side correlation ref (e.g., project.id) */ + correlationField: PropRef + /** Child-side correlation ref (e.g., issue.projectId) */ + childCorrelationField: PropRef + /** Whether the child query has an ORDER BY clause */ + hasOrderBy: boolean + /** Full compilation result for the child query (for nested includes + alias tracking) */ + childCompilationResult: CompilationResult +} + /** * Result of query compilation including both the pipeline and source-specific WHERE clauses */ @@ -67,6 +91,9 @@ export interface CompilationResult { * the inner aliases where collection subscriptions were created. */ aliasRemapping: Record + + /** Child pipelines for includes subqueries */ + includes?: Array } /** @@ -93,6 +120,9 @@ export function compileQuery( setWindowFn: (windowFn: (options: WindowOptions) => void) => void, cache: QueryCache = new WeakMap(), queryMapping: QueryMapping = new WeakMap(), + // For includes: parent key stream to inner-join with this query's FROM + parentKeyStream?: KeyedStream, + childCorrelationField?: PropRef, ): CompilationResult { // Check if the original raw query has already been compiled const cachedResult = cache.get(rawQuery) @@ -152,8 +182,42 @@ export function compileQuery( ) sources[mainSource] = mainInput + // If this is an includes child query, inner-join the raw input with parent keys. + // This filters the child collection to only rows matching parents in the result set. + // The inner join happens BEFORE namespace wrapping / WHERE / SELECT / ORDER BY, + // so the child pipeline only processes rows that match parents. + let filteredMainInput = mainInput + if (parentKeyStream && childCorrelationField) { + // Re-key child input by correlation field: [correlationValue, [childKey, childRow]] + const childFieldPath = childCorrelationField.path.slice(1) // remove alias prefix + const childRekeyed = mainInput.pipe( + map(([key, row]: [unknown, any]) => { + const correlationValue = getNestedValue(row, childFieldPath) + return [correlationValue, [key, row]] as [unknown, [unknown, any]] + }), + ) + + // Inner join: only children whose correlation key exists in parent keys pass through + const joined = childRekeyed.pipe(joinOperator(parentKeyStream, `inner`)) + + // Extract: [correlationValue, [[childKey, childRow], null]] → [childKey, childRow] + // Tag the row with __correlationKey for output routing + filteredMainInput = joined.pipe( + filter(([_correlationValue, [childSide]]: any) => { + return childSide != null + }), + map(([correlationValue, [childSide, _parentSide]]: any) => { + const [childKey, childRow] = childSide + return [childKey, { ...childRow, __correlationKey: correlationValue }] + }), + ) + + // Update sources so the rest of the pipeline uses the filtered input + sources[mainSource] = filteredMainInput + } + // Prepare the initial pipeline with the main source wrapped in its alias - let pipeline: NamespacedAndKeyedStream = mainInput.pipe( + let pipeline: NamespacedAndKeyedStream = filteredMainInput.pipe( map(([key, row]) => { // Initialize the record with a nested structure const ret = [key, { [mainSource]: row }] as [ @@ -214,6 +278,74 @@ export function compileQuery( } } + // Extract includes from SELECT, compile child pipelines, and replace with placeholders. + // This must happen AFTER WHERE (so parent pipeline is filtered) but BEFORE processSelect + // (so IncludesSubquery nodes are stripped before select compilation). + const includesResults: Array = [] + if (query.select) { + const includesEntries = extractIncludesFromSelect(query.select) + for (const { key, subquery } of includesEntries) { + // Branch parent pipeline: map to [correlationValue, null] + const compiledCorrelation = compileExpression(subquery.correlationField) + const parentKeys = pipeline.pipe( + map(([_key, nsRow]: any) => [compiledCorrelation(nsRow), null] as any), + ) + + // Recursively compile child query WITH the parent key stream + const childResult = compileQuery( + subquery.query, + allInputs, + collections, + subscriptions, + callbacks, + lazySources, + optimizableOrderByCollections, + setWindowFn, + cache, + queryMapping, + parentKeys, + subquery.childCorrelationField, + ) + + // Merge child's alias metadata into parent's + Object.assign(aliasToCollectionId, childResult.aliasToCollectionId) + Object.assign(aliasRemapping, childResult.aliasRemapping) + + includesResults.push({ + pipeline: childResult.pipeline, + fieldName: subquery.fieldName, + correlationField: subquery.correlationField, + childCorrelationField: subquery.childCorrelationField, + hasOrderBy: !!( + subquery.query.orderBy && subquery.query.orderBy.length > 0 + ), + childCompilationResult: childResult, + }) + + // Replace includes entry in select with a null placeholder + replaceIncludesInSelect(query.select, key) + } + + // Stamp correlation key values onto the namespaced row so they survive + // select extraction. This allows flushIncludesState to read them directly + // without requiring the correlation field to be in the user's select. + if (includesEntries.length > 0) { + const compiledCorrelations = includesEntries.map(({ subquery }) => ({ + fieldName: subquery.fieldName, + compiled: compileExpression(subquery.correlationField), + })) + pipeline = pipeline.pipe( + map(([key, nsRow]: any) => { + const correlationKeys: Record = {} + for (const { fieldName: fn, compiled } of compiledCorrelations) { + correlationKeys[fn] = compiled(nsRow) + } + return [key, { ...nsRow, __includesCorrelationKeys: correlationKeys }] + }), + ) + } + } + if (query.distinct && !query.fnSelect && !query.select) { throw new DistinctRequiresSelectError() } @@ -317,6 +449,15 @@ export function compileQuery( // Process orderBy parameter if it exists if (query.orderBy && query.orderBy.length > 0) { + // When in includes mode with limit/offset, use grouped ordering so that + // the limit is applied per parent (per correlation key), not globally. + const includesGroupKeyFn = + parentKeyStream && + (query.limit !== undefined || query.offset !== undefined) + ? (_key: unknown, row: unknown) => + (row as any)?.[mainSource]?.__correlationKey + : undefined + const orderedPipeline = processOrderBy( rawQuery, pipeline, @@ -327,26 +468,39 @@ export function compileQuery( setWindowFn, query.limit, query.offset, + includesGroupKeyFn, ) // Final step: extract the $selected and include orderBy index - const resultPipeline = orderedPipeline.pipe( + const resultPipeline: ResultStream = orderedPipeline.pipe( map(([key, [row, orderByIndex]]) => { // Extract the final results from $selected and include orderBy index const raw = (row as any).$selected const finalResults = unwrapValue(raw) + // Stamp includes correlation keys onto the result for child routing + if ((row as any).__includesCorrelationKeys) { + finalResults.__includesCorrelationKeys = ( + row as any + ).__includesCorrelationKeys + } + // When in includes mode, embed the correlation key as third element + if (parentKeyStream) { + const correlationKey = (row as any)[mainSource]?.__correlationKey + return [key, [finalResults, orderByIndex, correlationKey]] as any + } return [key, [finalResults, orderByIndex]] as [unknown, [any, string]] }), - ) + ) as ResultStream const result = resultPipeline // Cache the result before returning (use original query as key) - const compilationResult = { + const compilationResult: CompilationResult = { collectionId: mainCollectionId, pipeline: result, sourceWhereClauses, aliasToCollectionId, aliasRemapping, + includes: includesResults.length > 0 ? includesResults : undefined, } cache.set(rawQuery, compilationResult) @@ -362,6 +516,17 @@ export function compileQuery( // Extract the final results from $selected and return [key, [results, undefined]] const raw = (row as any).$selected const finalResults = unwrapValue(raw) + // Stamp includes correlation keys onto the result for child routing + if ((row as any).__includesCorrelationKeys) { + finalResults.__includesCorrelationKeys = ( + row as any + ).__includesCorrelationKeys + } + // When in includes mode, embed the correlation key as third element + if (parentKeyStream) { + const correlationKey = (row as any)[mainSource]?.__correlationKey + return [key, [finalResults, undefined, correlationKey]] as any + } return [key, [finalResults, undefined]] as [ unknown, [any, string | undefined], @@ -371,12 +536,13 @@ export function compileQuery( const result = resultPipeline // Cache the result before returning (use original query as key) - const compilationResult = { + const compilationResult: CompilationResult = { collectionId: mainCollectionId, pipeline: result, sourceWhereClauses, aliasToCollectionId, aliasRemapping, + includes: includesResults.length > 0 ? includesResults : undefined, } cache.set(rawQuery, compilationResult) @@ -704,4 +870,44 @@ export function followRef( } } +/** + * Walks a Select object to find IncludesSubquery entries. + * Returns array of {key, subquery} for each found includes. + */ +function extractIncludesFromSelect( + select: Record, +): Array<{ key: string; subquery: IncludesSubquery }> { + const results: Array<{ key: string; subquery: IncludesSubquery }> = [] + for (const [key, value] of Object.entries(select)) { + if (value instanceof IncludesSubquery) { + results.push({ key, subquery: value }) + } + } + return results +} + +/** + * Replaces an IncludesSubquery entry in the select object with a null Value placeholder. + * This ensures processSelect() doesn't encounter it. + */ +function replaceIncludesInSelect( + select: Record, + key: string, +): void { + select[key] = new ValClass(null) +} + +/** + * Gets a nested value from an object by path segments. + * For v1 with single-level correlation fields (e.g., `projectId`), it's just `obj[path[0]]`. + */ +function getNestedValue(obj: any, path: Array): any { + let value = obj + for (const segment of path) { + if (value == null) return value + value = value[segment] + } + return value +} + export type CompileQueryFn = typeof compileQuery diff --git a/packages/db/src/query/compiler/order-by.ts b/packages/db/src/query/compiler/order-by.ts index 0ced0081c..6ed5f958c 100644 --- a/packages/db/src/query/compiler/order-by.ts +++ b/packages/db/src/query/compiler/order-by.ts @@ -1,4 +1,7 @@ -import { orderByWithFractionalIndex } from '@tanstack/db-ivm' +import { + groupedOrderByWithFractionalIndex, + orderByWithFractionalIndex, +} from '@tanstack/db-ivm' import { defaultComparator, makeComparator } from '../../utils/comparison.js' import { PropRef, followRef } from '../ir.js' import { ensureIndexForField } from '../../indexes/auto-index.js' @@ -51,6 +54,7 @@ export function processOrderBy( setWindowFn: (windowFn: (options: WindowOptions) => void) => void, limit?: number, offset?: number, + groupKeyFn?: (key: unknown, value: unknown) => unknown, ): IStreamBuilder> { // Pre-compile all order by expressions const compiledOrderBy = orderByClause.map((clause) => { @@ -126,7 +130,9 @@ export function processOrderBy( // to loadSubset so the sync layer can optimize the query. // We try to use an index on the FIRST orderBy column for lazy loading, // even for multi-column orderBy (using wider bounds on first column). - if (limit) { + // Skip this optimization when using grouped ordering (includes with limit), + // because the limit is per-group, not global — the child collection needs all data loaded. + if (limit && !groupKeyFn) { let index: IndexInterface | undefined let followRefCollection: Collection | undefined let firstColumnValueExtractor: CompiledSingleRowExpression | undefined @@ -290,6 +296,33 @@ export function processOrderBy( } } + // Use grouped ordering when a groupKeyFn is provided (includes with limit/offset), + // otherwise use the standard global ordering operator. + if (groupKeyFn) { + return pipeline.pipe( + groupedOrderByWithFractionalIndex(valueExtractor, { + limit, + offset, + comparator: compare, + setSizeCallback, + groupKeyFn, + setWindowFn: ( + windowFn: (options: { offset?: number; limit?: number }) => void, + ) => { + setWindowFn((options) => { + windowFn(options) + if (orderByOptimizationInfo) { + orderByOptimizationInfo.offset = + options.offset ?? orderByOptimizationInfo.offset + orderByOptimizationInfo.limit = + options.limit ?? orderByOptimizationInfo.limit + } + }) + }, + }), + ) + } + // Use fractional indexing and return the tuple [value, index] return pipeline.pipe( orderByWithFractionalIndex(valueExtractor, { diff --git a/packages/db/src/query/compiler/select.ts b/packages/db/src/query/compiler/select.ts index c5baccb68..6eb866ce8 100644 --- a/packages/db/src/query/compiler/select.ts +++ b/packages/db/src/query/compiler/select.ts @@ -221,6 +221,15 @@ function addFromObject( } const expression = value as any + if (expression && expression.type === `includesSubquery`) { + // Placeholder — field will be set to a child Collection by the output layer + ops.push({ + kind: `field`, + alias: [...prefixPath, key].join(`.`), + compiled: () => null, + }) + continue + } if (isNestedSelectObject(expression)) { // Nested selection object addFromObject([...prefixPath, key], expression, ops) diff --git a/packages/db/src/query/ir.ts b/packages/db/src/query/ir.ts index b1e3d1e07..64ddd22c7 100644 --- a/packages/db/src/query/ir.ts +++ b/packages/db/src/query/ir.ts @@ -28,7 +28,7 @@ export interface QueryIR { export type From = CollectionRef | QueryRef export type Select = { - [alias: string]: BasicExpression | Aggregate | Select + [alias: string]: BasicExpression | Aggregate | Select | IncludesSubquery } export type Join = Array @@ -132,6 +132,18 @@ export class Aggregate extends BaseExpression { } } +export class IncludesSubquery extends BaseExpression { + public type = `includesSubquery` as const + constructor( + public query: QueryIR, // Child query (correlation WHERE removed) + public correlationField: PropRef, // Parent-side ref (e.g., project.id) + public childCorrelationField: PropRef, // Child-side ref (e.g., issue.projectId) + public fieldName: string, // Result field name (e.g., "issues") + ) { + super() + } +} + /** * Runtime helper to detect IR expression-like objects. * Prefer this over ad-hoc local implementations to keep behavior consistent. @@ -141,7 +153,8 @@ export function isExpressionLike(value: any): boolean { value instanceof Aggregate || value instanceof Func || value instanceof PropRef || - value instanceof Value + value instanceof Value || + value instanceof IncludesSubquery ) } diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index 72efb75d8..d2fdcb7e6 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -1,5 +1,7 @@ -import { D2, output } from '@tanstack/db-ivm' +import { D2, output, serializeValue } from '@tanstack/db-ivm' import { compileQuery } from '../compiler/index.js' +import { createCollection } from '../../collection/index.js' +import { IncludesSubquery } from '../ir.js' import { buildQuery, getQueryIR } from '../builder/index.js' import { MissingAliasInputsError, @@ -11,7 +13,10 @@ import { CollectionSubscriber } from './collection-subscriber.js' import { getCollectionBuilder } from './collection-registry.js' import { LIVE_QUERY_INTERNAL } from './internal.js' import type { LiveQueryInternalUtils } from './internal.js' -import type { WindowOptions } from '../compiler/index.js' +import type { + IncludesCompilationResult, + WindowOptions, +} from '../compiler/index.js' import type { SchedulerContextId } from '../../scheduler.js' import type { CollectionSubscription } from '../../collection/subscription.js' import type { RootStreamBuilder } from '@tanstack/db-ivm' @@ -26,7 +31,7 @@ import type { UtilsRecord, } from '../../types.js' import type { Context, GetResult } from '../builder/types.js' -import type { BasicExpression, QueryIR } from '../ir.js' +import type { BasicExpression, PropRef, QueryIR } from '../ir.js' import type { LazyCollectionCallbacks } from '../compiler/joins.js' import type { Changes, @@ -135,6 +140,7 @@ export class CollectionConfigBuilder< public sourceWhereClausesCache: | Map> | undefined + private includesCache: Array | undefined // Map of source alias to subscription readonly subscriptions: Record = {} @@ -627,6 +633,7 @@ export class CollectionConfigBuilder< this.inputsCache = undefined this.pipelineCache = undefined this.sourceWhereClausesCache = undefined + this.includesCache = undefined // Reset lazy source alias state this.lazySources.clear() @@ -675,6 +682,7 @@ export class CollectionConfigBuilder< this.pipelineCache = compilation.pipeline this.sourceWhereClausesCache = compilation.sourceWhereClauses this.compiledAliasToCollectionId = compilation.aliasToCollectionId + this.includesCache = compilation.includes // Defensive check: verify all compiled aliases have corresponding inputs // This should never happen since all aliases come from user declarations, @@ -722,10 +730,19 @@ export class CollectionConfigBuilder< }), ) + // Set up includes output routing and child collection lifecycle + const includesState = this.setupIncludesOutput( + this.includesCache, + syncState, + ) + // Flush pending changes and reset the accumulator. // Called at the end of each graph run to commit all accumulated changes. syncState.flushPendingChanges = () => { - if (pendingChanges.size === 0) { + const hasParentChanges = pendingChanges.size > 0 + const hasChildChanges = hasPendingIncludesChanges(includesState) + + if (!hasParentChanges && !hasChildChanges) { return } @@ -757,10 +774,21 @@ export class CollectionConfigBuilder< changesToApply = merged } - begin() - changesToApply.forEach(this.applyChanges.bind(this, config)) - commit() + // 1. Flush parent changes + if (hasParentChanges) { + begin() + changesToApply.forEach(this.applyChanges.bind(this, config)) + commit() + } pendingChanges = new Map() + + // 2. Process includes: create/dispose child Collections, route child changes + flushIncludesState( + includesState, + config.collection, + this.id, + hasParentChanges ? changesToApply : null, + ) } graph.finalize() @@ -773,6 +801,79 @@ export class CollectionConfigBuilder< return syncState as FullSyncState } + /** + * Sets up output callbacks for includes child pipelines. + * Each includes entry gets its own output callback that accumulates child changes, + * and a child registry that maps correlation key → child Collection. + */ + private setupIncludesOutput( + includesEntries: Array | undefined, + syncState: SyncState, + ): Array { + if (!includesEntries || includesEntries.length === 0) { + return [] + } + + return includesEntries.map((entry) => { + const state: IncludesOutputState = { + fieldName: entry.fieldName, + childCorrelationField: entry.childCorrelationField, + hasOrderBy: entry.hasOrderBy, + childRegistry: new Map(), + pendingChildChanges: new Map(), + correlationToParentKeys: new Map(), + } + + // Attach output callback on the child pipeline + entry.pipeline.pipe( + output((data) => { + const messages = data.getInner() + syncState.messagesCount += messages.length + + for (const [[childKey, tupleData], multiplicity] of messages) { + const [childResult, _orderByIndex, correlationKey] = + tupleData as unknown as [any, string | undefined, unknown] + + // Accumulate by [correlationKey, childKey] + let byChild = state.pendingChildChanges.get(correlationKey) + if (!byChild) { + byChild = new Map() + state.pendingChildChanges.set(correlationKey, byChild) + } + + const existing = byChild.get(childKey) || { + deletes: 0, + inserts: 0, + value: childResult, + orderByIndex: _orderByIndex, + } + + if (multiplicity < 0) { + existing.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + existing.inserts += multiplicity + existing.value = childResult + } + + byChild.set(childKey, existing) + } + }), + ) + + // Set up shared buffers for nested includes (e.g., comments inside issues) + if (entry.childCompilationResult.includes) { + state.nestedSetups = setupNestedPipelines( + entry.childCompilationResult.includes, + syncState, + ) + state.nestedRoutingIndex = new Map() + state.nestedRoutingReverseIndex = new Map() + } + + return state + }) + } + private applyChanges( config: SyncMethods, changes: { @@ -1053,6 +1154,24 @@ function extractCollectionsFromQuery( } } } + + // Extract from SELECT (for IncludesSubquery) + if (q.select) { + extractFromSelect(q.select) + } + } + + function extractFromSelect(select: any) { + for (const [key, value] of Object.entries(select)) { + if (typeof key === `string` && key.startsWith(`__SPREAD_SENTINEL__`)) { + continue + } + if (value instanceof IncludesSubquery) { + extractFromQuery(value.query) + } else if (isNestedSelectObject(value)) { + extractFromSelect(value) + } + } } // Start extraction from the root query @@ -1122,6 +1241,19 @@ function extractCollectionAliases(query: QueryIR): Map> { } } + function traverseSelect(select: any) { + for (const [key, value] of Object.entries(select)) { + if (typeof key === `string` && key.startsWith(`__SPREAD_SENTINEL__`)) { + continue + } + if (value instanceof IncludesSubquery) { + traverse(value.query) + } else if (isNestedSelectObject(value)) { + traverseSelect(value) + } + } + } + function traverse(q?: QueryIR) { if (!q) return @@ -1132,6 +1264,10 @@ function extractCollectionAliases(query: QueryIR): Map> { recordAlias(joinClause.from) } } + + if (q.select) { + traverseSelect(q.select) + } } traverse(query) @@ -1139,6 +1275,564 @@ function extractCollectionAliases(query: QueryIR): Map> { return aliasesById } +/** + * Check if a value is a nested select object (plain object, not an expression) + */ +function isNestedSelectObject(obj: any): boolean { + if (obj === null || typeof obj !== `object`) return false + if (obj instanceof IncludesSubquery) return false + // Expression-like objects have a .type property + if (`type` in obj && typeof obj.type === `string`) return false + // Ref proxies from spread operations + if (obj.__refProxy) return false + return true +} + +/** + * Shared buffer setup for a single nested includes level. + * Pipeline output writes into the buffer; during flush the buffer is drained + * into per-entry states via the routing index. + */ +type NestedIncludesSetup = { + compilationResult: IncludesCompilationResult + /** Shared buffer: nestedCorrelationKey → Map */ + buffer: Map>> + /** For 3+ levels of nesting */ + nestedSetups?: Array +} + +/** + * State tracked per includes entry for output routing and child lifecycle + */ +type IncludesOutputState = { + fieldName: string + childCorrelationField: PropRef + /** Whether the child query has an ORDER BY clause */ + hasOrderBy: boolean + /** Maps correlation key value → child Collection entry */ + childRegistry: Map + /** Pending child changes: correlationKey → Map */ + pendingChildChanges: Map>> + /** Reverse index: correlation key → Set of parent collection keys */ + correlationToParentKeys: Map> + /** Shared nested pipeline setups (one per nested includes level) */ + nestedSetups?: Array + /** nestedCorrelationKey → parentCorrelationKey */ + nestedRoutingIndex?: Map + /** parentCorrelationKey → Set */ + nestedRoutingReverseIndex?: Map> +} + +type ChildCollectionEntry = { + collection: Collection + syncMethods: SyncMethods | null + resultKeys: WeakMap + orderByIndices: WeakMap | null + /** Per-entry nested includes states (one per nested includes level) */ + includesStates?: Array +} + +/** + * Sets up shared buffers for nested includes pipelines. + * Instead of writing directly into a single shared IncludesOutputState, + * each nested pipeline writes into a buffer that is later drained per-entry. + */ +function setupNestedPipelines( + includes: Array, + syncState: SyncState, +): Array { + return includes.map((entry) => { + const buffer: Map>> = new Map() + + // Attach output callback that writes into the shared buffer + entry.pipeline.pipe( + output((data) => { + const messages = data.getInner() + syncState.messagesCount += messages.length + + for (const [[childKey, tupleData], multiplicity] of messages) { + const [childResult, _orderByIndex, correlationKey] = + tupleData as unknown as [any, string | undefined, unknown] + + let byChild = buffer.get(correlationKey) + if (!byChild) { + byChild = new Map() + buffer.set(correlationKey, byChild) + } + + const existing = byChild.get(childKey) || { + deletes: 0, + inserts: 0, + value: childResult, + orderByIndex: _orderByIndex, + } + + if (multiplicity < 0) { + existing.deletes += Math.abs(multiplicity) + } else if (multiplicity > 0) { + existing.inserts += multiplicity + existing.value = childResult + } + + byChild.set(childKey, existing) + } + }), + ) + + const setup: NestedIncludesSetup = { + compilationResult: entry, + buffer, + } + + // Recursively set up deeper levels + if (entry.childCompilationResult.includes) { + setup.nestedSetups = setupNestedPipelines( + entry.childCompilationResult.includes, + syncState, + ) + } + + return setup + }) +} + +/** + * Creates fresh per-entry IncludesOutputState array from NestedIncludesSetup array. + * Each entry gets its own isolated state for nested includes. + */ +function createPerEntryIncludesStates( + setups: Array, +): Array { + return setups.map((setup) => { + const state: IncludesOutputState = { + fieldName: setup.compilationResult.fieldName, + childCorrelationField: setup.compilationResult.childCorrelationField, + hasOrderBy: setup.compilationResult.hasOrderBy, + childRegistry: new Map(), + pendingChildChanges: new Map(), + correlationToParentKeys: new Map(), + } + + if (setup.nestedSetups) { + state.nestedSetups = setup.nestedSetups + state.nestedRoutingIndex = new Map() + state.nestedRoutingReverseIndex = new Map() + } + + return state + }) +} + +/** + * Drains shared buffers into per-entry states using the routing index. + * Returns the set of parent correlation keys that had changes routed to them. + */ +function drainNestedBuffers(state: IncludesOutputState): Set { + const dirtyCorrelationKeys = new Set() + + if (!state.nestedSetups) return dirtyCorrelationKeys + + for (let i = 0; i < state.nestedSetups.length; i++) { + const setup = state.nestedSetups[i]! + const toDelete: Array = [] + + for (const [nestedCorrelationKey, childChanges] of setup.buffer) { + const parentCorrelationKey = + state.nestedRoutingIndex!.get(nestedCorrelationKey) + if (parentCorrelationKey === undefined) { + // Unroutable — parent not yet seen; keep in buffer + continue + } + + const entry = state.childRegistry.get(parentCorrelationKey) + if (!entry || !entry.includesStates) { + continue + } + + // Route changes into this entry's per-entry state at position i + const entryState = entry.includesStates[i]! + for (const [childKey, changes] of childChanges) { + let byChild = entryState.pendingChildChanges.get(nestedCorrelationKey) + if (!byChild) { + byChild = new Map() + entryState.pendingChildChanges.set(nestedCorrelationKey, byChild) + } + const existing = byChild.get(childKey) + if (existing) { + existing.inserts += changes.inserts + existing.deletes += changes.deletes + if (changes.inserts > 0) { + existing.value = changes.value + if (changes.orderByIndex !== undefined) { + existing.orderByIndex = changes.orderByIndex + } + } + } else { + byChild.set(childKey, { ...changes }) + } + } + + dirtyCorrelationKeys.add(parentCorrelationKey) + toDelete.push(nestedCorrelationKey) + } + + for (const key of toDelete) { + setup.buffer.delete(key) + } + } + + return dirtyCorrelationKeys +} + +/** + * Updates the routing index after processing child changes. + * Maps nested correlation keys to parent correlation keys so that + * grandchild changes can be routed to the correct per-entry state. + */ +function updateRoutingIndex( + state: IncludesOutputState, + correlationKey: unknown, + childChanges: Map>, +): void { + if (!state.nestedSetups) return + + for (const setup of state.nestedSetups) { + for (const [, change] of childChanges) { + if (change.inserts > 0) { + // Read the pre-computed nested correlation key from the compiler stamp + const nestedCorrelationKey = + change.value.__includesCorrelationKeys?.[ + setup.compilationResult.fieldName + ] + + if (nestedCorrelationKey != null) { + state.nestedRoutingIndex!.set(nestedCorrelationKey, correlationKey) + let reverseSet = state.nestedRoutingReverseIndex!.get(correlationKey) + if (!reverseSet) { + reverseSet = new Set() + state.nestedRoutingReverseIndex!.set(correlationKey, reverseSet) + } + reverseSet.add(nestedCorrelationKey) + } + } else if (change.deletes > 0 && change.inserts === 0) { + // Remove from routing index + const nestedCorrelationKey = + change.value.__includesCorrelationKeys?.[ + setup.compilationResult.fieldName + ] + + if (nestedCorrelationKey != null) { + state.nestedRoutingIndex!.delete(nestedCorrelationKey) + const reverseSet = + state.nestedRoutingReverseIndex!.get(correlationKey) + if (reverseSet) { + reverseSet.delete(nestedCorrelationKey) + if (reverseSet.size === 0) { + state.nestedRoutingReverseIndex!.delete(correlationKey) + } + } + } + } + } + } +} + +/** + * Cleans routing index entries when a parent is deleted. + * Uses the reverse index to find and remove all nested routing entries. + */ +function cleanRoutingIndexOnDelete( + state: IncludesOutputState, + correlationKey: unknown, +): void { + if (!state.nestedRoutingReverseIndex) return + + const nestedKeys = state.nestedRoutingReverseIndex.get(correlationKey) + if (nestedKeys) { + for (const nestedKey of nestedKeys) { + state.nestedRoutingIndex!.delete(nestedKey) + } + state.nestedRoutingReverseIndex.delete(correlationKey) + } +} + +/** + * Recursively checks whether any nested buffer has pending changes. + */ +function hasNestedBufferChanges(setups: Array): boolean { + for (const setup of setups) { + if (setup.buffer.size > 0) return true + if (setup.nestedSetups && hasNestedBufferChanges(setup.nestedSetups)) + return true + } + return false +} + +/** + * Creates a child Collection entry for includes subqueries. + * The child Collection is a full-fledged Collection instance that starts syncing immediately. + */ +function createChildCollectionEntry( + parentId: string, + fieldName: string, + correlationKey: unknown, + hasOrderBy: boolean, + nestedSetups?: Array, +): ChildCollectionEntry { + const resultKeys = new WeakMap() + const orderByIndices = hasOrderBy ? new WeakMap() : null + let syncMethods: SyncMethods | null = null + + const compare = orderByIndices + ? createOrderByComparator(orderByIndices) + : undefined + + const collection = createCollection({ + id: `__child-collection:${parentId}-${fieldName}-${serializeValue(correlationKey)}`, + getKey: (item: any) => resultKeys.get(item) as string | number, + compare, + sync: { + rowUpdateMode: `full`, + sync: (methods) => { + syncMethods = methods + return () => { + syncMethods = null + } + }, + }, + startSync: true, + }) + + const entry: ChildCollectionEntry = { + collection, + get syncMethods() { + return syncMethods + }, + resultKeys, + orderByIndices, + } + + if (nestedSetups) { + entry.includesStates = createPerEntryIncludesStates(nestedSetups) + } + + return entry +} + +/** + * Flushes includes state using a bottom-up per-entry approach. + * Five phases ensure correct ordering: + * 1. Parent INSERTs — create child entries with per-entry nested states + * 2. Child changes — apply to child Collections, update routing index + * 3. Drain nested buffers — route buffered grandchild changes to per-entry states + * 4. Flush per-entry states — recursively flush nested includes on each entry + * 5. Parent DELETEs — clean up child entries and routing index + */ +function flushIncludesState( + includesState: Array, + parentCollection: Collection, + parentId: string, + parentChanges: Map> | null, +): void { + for (const state of includesState) { + // Phase 1: Parent INSERTs — ensure a child Collection exists for every parent + if (parentChanges) { + for (const [parentKey, changes] of parentChanges) { + if (changes.inserts > 0) { + const parentResult = changes.value + // Read the pre-computed correlation key from the compiler stamp + const correlationKey = + parentResult.__includesCorrelationKeys?.[state.fieldName] + + if (correlationKey != null) { + // Ensure child Collection exists for this correlation key + if (!state.childRegistry.has(correlationKey)) { + const entry = createChildCollectionEntry( + parentId, + state.fieldName, + correlationKey, + state.hasOrderBy, + state.nestedSetups, + ) + state.childRegistry.set(correlationKey, entry) + } + // Update reverse index: correlation key → parent keys + let parentKeys = state.correlationToParentKeys.get(correlationKey) + if (!parentKeys) { + parentKeys = new Set() + state.correlationToParentKeys.set(correlationKey, parentKeys) + } + parentKeys.add(parentKey) + + // Attach child Collection to the parent result + parentResult[state.fieldName] = + state.childRegistry.get(correlationKey)!.collection + } + } + } + } + + // Phase 2: Child changes — apply to child Collections + // Track which entries had child changes and capture their childChanges maps + const entriesWithChildChanges = new Map< + unknown, + { entry: ChildCollectionEntry; childChanges: Map> } + >() + + if (state.pendingChildChanges.size > 0) { + for (const [correlationKey, childChanges] of state.pendingChildChanges) { + // Ensure child Collection exists for this correlation key + let entry = state.childRegistry.get(correlationKey) + if (!entry) { + entry = createChildCollectionEntry( + parentId, + state.fieldName, + correlationKey, + state.hasOrderBy, + state.nestedSetups, + ) + state.childRegistry.set(correlationKey, entry) + } + + // Attach the child Collection to ANY parent that has this correlation key + attachChildCollectionToParent( + parentCollection, + state.fieldName, + correlationKey, + state.correlationToParentKeys, + entry.collection, + ) + + // Apply child changes to the child Collection + if (entry.syncMethods) { + entry.syncMethods.begin() + for (const [childKey, change] of childChanges) { + entry.resultKeys.set(change.value, childKey) + if (entry.orderByIndices && change.orderByIndex !== undefined) { + entry.orderByIndices.set(change.value, change.orderByIndex) + } + if (change.inserts > 0 && change.deletes === 0) { + entry.syncMethods.write({ value: change.value, type: `insert` }) + } else if ( + change.inserts > change.deletes || + (change.inserts === change.deletes && + entry.syncMethods.collection.has( + entry.syncMethods.collection.getKeyFromItem(change.value), + )) + ) { + entry.syncMethods.write({ value: change.value, type: `update` }) + } else if (change.deletes > 0) { + entry.syncMethods.write({ value: change.value, type: `delete` }) + } + } + entry.syncMethods.commit() + } + + // Update routing index for nested includes + updateRoutingIndex(state, correlationKey, childChanges) + + entriesWithChildChanges.set(correlationKey, { entry, childChanges }) + } + state.pendingChildChanges.clear() + } + + // Phase 3: Drain nested buffers — route buffered grandchild changes to per-entry states + const dirtyFromBuffers = drainNestedBuffers(state) + + // Phase 4: Flush per-entry states + // First: entries that had child changes in Phase 2 + for (const [, { entry, childChanges }] of entriesWithChildChanges) { + if (entry.includesStates) { + flushIncludesState( + entry.includesStates, + entry.collection, + entry.collection.id, + childChanges, + ) + } + } + // Then: entries that only had buffer-routed changes (no child changes at this level) + for (const correlationKey of dirtyFromBuffers) { + if (entriesWithChildChanges.has(correlationKey)) continue + const entry = state.childRegistry.get(correlationKey) + if (entry?.includesStates) { + flushIncludesState( + entry.includesStates, + entry.collection, + entry.collection.id, + null, + ) + } + } + + // Phase 5: Parent DELETEs — dispose child Collections and clean up + if (parentChanges) { + for (const [parentKey, changes] of parentChanges) { + if (changes.deletes > 0 && changes.inserts === 0) { + const correlationKey = + changes.value.__includesCorrelationKeys?.[state.fieldName] + if (correlationKey != null) { + cleanRoutingIndexOnDelete(state, correlationKey) + state.childRegistry.delete(correlationKey) + // Clean up reverse index + const parentKeys = state.correlationToParentKeys.get(correlationKey) + if (parentKeys) { + parentKeys.delete(parentKey) + if (parentKeys.size === 0) { + state.correlationToParentKeys.delete(correlationKey) + } + } + } + } + } + } + } + + // Clean up the internal stamp from parent/child results so it doesn't leak to the user + if (parentChanges) { + for (const [, changes] of parentChanges) { + delete changes.value.__includesCorrelationKeys + } + } +} + +/** + * Checks whether any includes state has pending changes that need to be flushed. + * Checks direct pending child changes and shared nested buffers. + */ +function hasPendingIncludesChanges( + states: Array, +): boolean { + for (const state of states) { + if (state.pendingChildChanges.size > 0) return true + if (state.nestedSetups && hasNestedBufferChanges(state.nestedSetups)) + return true + } + return false +} + +/** + * Attaches a child Collection to parent rows that match a given correlation key. + * Uses the reverse index to look up parent keys directly instead of scanning. + */ +function attachChildCollectionToParent( + parentCollection: Collection, + fieldName: string, + correlationKey: unknown, + correlationToParentKeys: Map>, + childCollection: Collection, +): void { + const parentKeys = correlationToParentKeys.get(correlationKey) + if (!parentKeys) return + + for (const parentKey of parentKeys) { + const item = parentCollection.get(parentKey as any) + if (item) { + item[fieldName] = childCollection + } + } +} + function accumulateChanges( acc: Map>, [[key, tupleData], multiplicity]: [ diff --git a/packages/db/tests/query/includes.test.ts b/packages/db/tests/query/includes.test.ts new file mode 100644 index 000000000..221ff6b0c --- /dev/null +++ b/packages/db/tests/query/includes.test.ts @@ -0,0 +1,750 @@ +import { beforeEach, describe, expect, it } from 'vitest' +import { createLiveQueryCollection, eq } from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { mockSyncCollectionOptions } from '../utils.js' + +type Project = { + id: number + name: string +} + +type Issue = { + id: number + projectId: number + title: string +} + +type Comment = { + id: number + issueId: number + body: string +} + +const sampleProjects: Array = [ + { id: 1, name: `Alpha` }, + { id: 2, name: `Beta` }, + { id: 3, name: `Gamma` }, +] + +const sampleIssues: Array = [ + { id: 10, projectId: 1, title: `Bug in Alpha` }, + { id: 11, projectId: 1, title: `Feature for Alpha` }, + { id: 20, projectId: 2, title: `Bug in Beta` }, + // No issues for project 3 +] + +const sampleComments: Array = [ + { id: 100, issueId: 10, body: `Looks bad` }, + { id: 101, issueId: 10, body: `Fixed it` }, + { id: 200, issueId: 20, body: `Same bug` }, + // No comments for issue 11 +] + +function createProjectsCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-projects`, + getKey: (p) => p.id, + initialData: sampleProjects, + }), + ) +} + +function createIssuesCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-issues`, + getKey: (i) => i.id, + initialData: sampleIssues, + }), + ) +} + +function createCommentsCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-comments`, + getKey: (c) => c.id, + initialData: sampleComments, + }), + ) +} + +/** + * Extracts child collection items as a sorted plain array for comparison. + */ +function childItems(collection: any, sortKey = `id`): Array { + return [...collection.toArray].sort( + (a: any, b: any) => a[sortKey] - b[sortKey], + ) +} + +/** + * Recursively converts a live query collection (or child Collection) into a + * plain sorted array, turning any nested child Collections into nested arrays. + * This lets tests compare the full hierarchical result as a single literal. + */ +function toTree(collection: any, sortKey = `id`): Array { + const rows = [...collection.toArray].sort( + (a: any, b: any) => a[sortKey] - b[sortKey], + ) + return rows.map((row: any) => { + const out: Record = {} + for (const [key, value] of Object.entries(row)) { + out[key] = + value && typeof value === `object` && `toArray` in (value as any) + ? toTree(value, sortKey) + : value + } + return out + }) +} + +describe(`includes subqueries`, () => { + let projects: ReturnType + let issues: ReturnType + let comments: ReturnType + + beforeEach(() => { + projects = createProjectsCollection() + issues = createIssuesCollection() + comments = createCommentsCollection() + }) + + function buildIncludesQuery() { + return createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + } + + describe(`basic includes`, () => { + it(`produces child Collections on parent rows`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Alpha`, + issues: [ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ], + }, + { + id: 2, + name: `Beta`, + issues: [{ id: 20, title: `Bug in Beta` }], + }, + { + id: 3, + name: `Gamma`, + issues: [], + }, + ]) + }) + }) + + describe(`reactivity`, () => { + it(`adding a child updates the parent's child collection`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(1) as any).issues)).toHaveLength(2) + + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 12, projectId: 1, title: `New Alpha issue` }, + }) + issues.utils.commit() + + expect(childItems((collection.get(1) as any).issues)).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + { id: 12, title: `New Alpha issue` }, + ]) + }) + + it(`removing a child updates the parent's child collection`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(1) as any).issues)).toHaveLength(2) + + issues.utils.begin() + issues.utils.write({ + type: `delete`, + value: sampleIssues.find((i) => i.id === 10)!, + }) + issues.utils.commit() + + expect(childItems((collection.get(1) as any).issues)).toEqual([ + { id: 11, title: `Feature for Alpha` }, + ]) + }) + + it(`removing and re-adding a parent resets its child collection`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(1) as any).issues)).toHaveLength(2) + + // Remove project Alpha + projects.utils.begin() + projects.utils.write({ + type: `delete`, + value: sampleProjects.find((p) => p.id === 1)!, + }) + projects.utils.commit() + + expect(collection.get(1)).toBeUndefined() + + // Re-add project Alpha — should get a fresh child collection + projects.utils.begin() + projects.utils.write({ + type: `insert`, + value: { id: 1, name: `Alpha Reborn` }, + }) + projects.utils.commit() + + const alpha = collection.get(1) as any + expect(alpha).toMatchObject({ id: 1, name: `Alpha Reborn` }) + expect(childItems(alpha.issues)).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ]) + + // New children should flow into the child collection + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 99, projectId: 1, title: `Post-rebirth issue` }, + }) + issues.utils.commit() + + expect(childItems((collection.get(1) as any).issues)).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + { id: 99, title: `Post-rebirth issue` }, + ]) + }) + + it(`adding a child to a previously empty parent works`, async () => { + const collection = buildIncludesQuery() + await collection.preload() + + expect(childItems((collection.get(3) as any).issues)).toEqual([]) + + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 30, projectId: 3, title: `Gamma issue` }, + }) + issues.utils.commit() + + expect(childItems((collection.get(3) as any).issues)).toEqual([ + { id: 30, title: `Gamma issue` }, + ]) + }) + }) + + describe(`inner join filtering`, () => { + it(`only shows children for parents matching a WHERE clause`, async () => { + const collection = createLiveQueryCollection((q) => + q + .from({ p: projects }) + .where(({ p }) => eq(p.name, `Alpha`)) + .select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Alpha`, + issues: [ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ], + }, + ]) + }) + }) + + describe(`ordered child queries`, () => { + it(`child collection respects orderBy on the child query`, async () => { + const collection = createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `desc`) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha's issues should be sorted by title descending: + // "Feature for Alpha" before "Bug in Alpha" + const alpha = collection.get(1) as any + const alphaIssues = [...alpha.issues.toArray] + expect(alphaIssues).toEqual([ + { id: 11, title: `Feature for Alpha` }, + { id: 10, title: `Bug in Alpha` }, + ]) + + // Beta has one issue, order doesn't matter but it should still work + const beta = collection.get(2) as any + const betaIssues = [...beta.issues.toArray] + expect(betaIssues).toEqual([{ id: 20, title: `Bug in Beta` }]) + + // Gamma has no issues + const gamma = collection.get(3) as any + expect([...gamma.issues.toArray]).toEqual([]) + }) + + it(`newly inserted children appear in the correct order`, async () => { + const collection = createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha issues sorted ascending: "Bug in Alpha", "Feature for Alpha" + expect([...(collection.get(1) as any).issues.toArray]).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ]) + + // Insert an issue that should appear between the existing two + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 12, projectId: 1, title: `Docs for Alpha` }, + }) + issues.utils.commit() + + // Should maintain ascending order: Bug, Docs, Feature + expect([...(collection.get(1) as any).issues.toArray]).toEqual([ + { id: 10, title: `Bug in Alpha` }, + { id: 12, title: `Docs for Alpha` }, + { id: 11, title: `Feature for Alpha` }, + ]) + }) + }) + + describe(`ordered child queries with limit`, () => { + it(`limits child collection to N items per parent`, async () => { + const collection = createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .limit(1) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha has 2 issues; limit(1) with asc title should keep only "Bug in Alpha" + const alpha = collection.get(1) as any + expect([...alpha.issues.toArray]).toEqual([ + { id: 10, title: `Bug in Alpha` }, + ]) + + // Beta has 1 issue; limit(1) keeps it + const beta = collection.get(2) as any + expect([...beta.issues.toArray]).toEqual([ + { id: 20, title: `Bug in Beta` }, + ]) + + // Gamma has 0 issues; limit(1) still empty + const gamma = collection.get(3) as any + expect([...gamma.issues.toArray]).toEqual([]) + }) + + it(`inserting a child that displaces an existing one respects the limit`, async () => { + const collection = createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .orderBy(({ i }) => i.title, `asc`) + .limit(1) + .select(({ i }) => ({ + id: i.id, + title: i.title, + })), + })), + ) + + await collection.preload() + + // Alpha should have exactly 1 issue (limit 1): "Bug in Alpha" + const alphaIssues = [...(collection.get(1) as any).issues.toArray] + expect(alphaIssues).toHaveLength(1) + expect(alphaIssues).toEqual([{ id: 10, title: `Bug in Alpha` }]) + + // Insert an issue that comes before "Bug" alphabetically + issues.utils.begin() + issues.utils.write({ + type: `insert`, + value: { id: 12, projectId: 1, title: `Alpha priority issue` }, + }) + issues.utils.commit() + + // The new issue should displace "Bug in Alpha" since it sorts first + expect([...(collection.get(1) as any).issues.toArray]).toEqual([ + { id: 12, title: `Alpha priority issue` }, + ]) + + // Beta should still have its 1 issue (limit is per-parent) + expect([...(collection.get(2) as any).issues.toArray]).toEqual([ + { id: 20, title: `Bug in Beta` }, + ]) + }) + }) + + describe(`shared correlation key`, () => { + // Multiple parents share the same correlationKey value. + // e.g., two teams in the same department — both should see the same department members. + type Team = { id: number; name: string; departmentId: number } + type Member = { id: number; departmentId: number; name: string } + + const sampleTeams: Array = [ + { id: 1, name: `Frontend`, departmentId: 100 }, + { id: 2, name: `Backend`, departmentId: 100 }, + { id: 3, name: `Marketing`, departmentId: 200 }, + ] + + const sampleMembers: Array = [ + { id: 10, departmentId: 100, name: `Alice` }, + { id: 11, departmentId: 100, name: `Bob` }, + { id: 20, departmentId: 200, name: `Charlie` }, + ] + + function createTeamsCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-teams`, + getKey: (t) => t.id, + initialData: sampleTeams, + }), + ) + } + + function createMembersCollection() { + return createCollection( + mockSyncCollectionOptions({ + id: `includes-members`, + getKey: (m) => m.id, + initialData: sampleMembers, + }), + ) + } + + it(`multiple parents with the same correlationKey each get the shared children`, async () => { + const teams = createTeamsCollection() + const members = createMembersCollection() + + const collection = createLiveQueryCollection((q) => + q.from({ t: teams }).select(({ t }) => ({ + id: t.id, + name: t.name, + departmentId: t.departmentId, + members: q + .from({ m: members }) + .where(({ m }) => eq(m.departmentId, t.departmentId)) + .select(({ m }) => ({ + id: m.id, + name: m.name, + })), + })), + ) + + await collection.preload() + + // Both Frontend and Backend teams share departmentId 100 + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Frontend`, + departmentId: 100, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 2, + name: `Backend`, + departmentId: 100, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 3, + name: `Marketing`, + departmentId: 200, + members: [{ id: 20, name: `Charlie` }], + }, + ]) + }) + + it(`adding a child updates all parents that share the correlation key`, async () => { + const teams = createTeamsCollection() + const members = createMembersCollection() + + const collection = createLiveQueryCollection((q) => + q.from({ t: teams }).select(({ t }) => ({ + id: t.id, + name: t.name, + departmentId: t.departmentId, + members: q + .from({ m: members }) + .where(({ m }) => eq(m.departmentId, t.departmentId)) + .select(({ m }) => ({ + id: m.id, + name: m.name, + })), + })), + ) + + await collection.preload() + + // Add a new member to department 100 + members.utils.begin() + members.utils.write({ + type: `insert`, + value: { id: 12, departmentId: 100, name: `Dave` }, + }) + members.utils.commit() + + // Both Frontend and Backend should see the new member + expect(childItems((collection.get(1) as any).members)).toEqual([ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + { id: 12, name: `Dave` }, + ]) + expect(childItems((collection.get(2) as any).members)).toEqual([ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + { id: 12, name: `Dave` }, + ]) + + // Marketing unaffected + expect(childItems((collection.get(3) as any).members)).toEqual([ + { id: 20, name: `Charlie` }, + ]) + }) + + it(`correlation field does not need to be in the parent select`, async () => { + const teams = createTeamsCollection() + const members = createMembersCollection() + + // departmentId is used for correlation but NOT selected in the parent output + const collection = createLiveQueryCollection((q) => + q.from({ t: teams }).select(({ t }) => ({ + id: t.id, + name: t.name, + members: q + .from({ m: members }) + .where(({ m }) => eq(m.departmentId, t.departmentId)) + .select(({ m }) => ({ + id: m.id, + name: m.name, + })), + })), + ) + + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Frontend`, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 2, + name: `Backend`, + members: [ + { id: 10, name: `Alice` }, + { id: 11, name: `Bob` }, + ], + }, + { + id: 3, + name: `Marketing`, + members: [{ id: 20, name: `Charlie` }], + }, + ]) + }) + }) + + describe(`nested includes`, () => { + function buildNestedQuery() { + return createLiveQueryCollection((q) => + q.from({ p: projects }).select(({ p }) => ({ + id: p.id, + name: p.name, + issues: q + .from({ i: issues }) + .where(({ i }) => eq(i.projectId, p.id)) + .select(({ i }) => ({ + id: i.id, + title: i.title, + comments: q + .from({ c: comments }) + .where(({ c }) => eq(c.issueId, i.id)) + .select(({ c }) => ({ + id: c.id, + body: c.body, + })), + })), + })), + ) + } + + it(`supports two levels of includes`, async () => { + const collection = buildNestedQuery() + await collection.preload() + + expect(toTree(collection)).toEqual([ + { + id: 1, + name: `Alpha`, + issues: [ + { + id: 10, + title: `Bug in Alpha`, + comments: [ + { id: 100, body: `Looks bad` }, + { id: 101, body: `Fixed it` }, + ], + }, + { + id: 11, + title: `Feature for Alpha`, + comments: [], + }, + ], + }, + { + id: 2, + name: `Beta`, + issues: [ + { + id: 20, + title: `Bug in Beta`, + comments: [{ id: 200, body: `Same bug` }], + }, + ], + }, + { + id: 3, + name: `Gamma`, + issues: [], + }, + ]) + }) + + it(`adding a grandchild (comment) updates the nested child collection`, async () => { + const collection = buildNestedQuery() + await collection.preload() + + // Issue 11 (Feature for Alpha) has no comments initially + const alpha = collection.get(1) as any + const issue11 = alpha.issues.get(11) + expect(childItems(issue11.comments)).toEqual([]) + + // Add a comment to issue 11 — no issue or project changes + comments.utils.begin() + comments.utils.write({ + type: `insert`, + value: { id: 110, issueId: 11, body: `Great feature` }, + }) + comments.utils.commit() + + const issue11After = (collection.get(1) as any).issues.get(11) + expect(childItems(issue11After.comments)).toEqual([ + { id: 110, body: `Great feature` }, + ]) + }) + + it(`removing a grandchild (comment) updates the nested child collection`, async () => { + const collection = buildNestedQuery() + await collection.preload() + + // Issue 10 (Bug in Alpha) has 2 comments + const issue10 = (collection.get(1) as any).issues.get(10) + expect(childItems(issue10.comments)).toHaveLength(2) + + // Remove one comment + comments.utils.begin() + comments.utils.write({ + type: `delete`, + value: sampleComments.find((c) => c.id === 100)!, + }) + comments.utils.commit() + + const issue10After = (collection.get(1) as any).issues.get(10) + expect(childItems(issue10After.comments)).toEqual([ + { id: 101, body: `Fixed it` }, + ]) + }) + }) +})