Skip to content
68 changes: 68 additions & 0 deletions .changeset/auto-register-operators.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
---
"@tanstack/db": patch
---

Refactor operators and aggregates to embed their evaluators directly in IR nodes for true tree-shaking support and custom extensibility.

Each operator and aggregate now bundles its builder function and evaluator factory in a single file. The factory is embedded directly in the `Func` or `Aggregate` IR node, eliminating the need for a global registry. This enables:

- **True tree-shaking**: Only operators/aggregates you import are included in your bundle
- **No global registry**: No side-effect imports needed; each node is self-contained
- **Custom operators**: Create custom operators by building `Func` nodes with a factory
- **Custom aggregates**: Create custom aggregates by building `Aggregate` nodes with a config

**Custom Operator Example:**

```typescript
import {
Func,
type EvaluatorFactory,
type CompiledExpression,
} from "@tanstack/db"
import { toExpression } from "@tanstack/db/query"

const betweenFactory: EvaluatorFactory = (compiledArgs, _isSingleRow) => {
const [valueEval, minEval, maxEval] = compiledArgs
return (data) => {
const value = valueEval!(data)
return value >= minEval!(data) && value <= maxEval!(data)
}
}

function between(value: any, min: any, max: any) {
return new Func(
"between",
[toExpression(value), toExpression(min), toExpression(max)],
betweenFactory
)
}
```

**Custom Aggregate Example:**

```typescript
import {
Aggregate,
type AggregateConfig,
type ValueExtractor,
} from "@tanstack/db"
import { toExpression } from "@tanstack/db/query"

const productConfig: AggregateConfig = {
factory: (valueExtractor: ValueExtractor) => ({
preMap: valueExtractor,
reduce: (values) => {
let product = 1
for (const [value, multiplicity] of values) {
for (let i = 0; i < multiplicity; i++) product *= value
}
return product
},
}),
valueTransform: "numeric",
}

function product<T>(arg: T): Aggregate<number> {
return new Aggregate("product", [toExpression(arg)], productConfig)
}
```
18 changes: 16 additions & 2 deletions docs/guides/live-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ const userStats = createCollection(liveQueryCollectionOptions({
Use various aggregate functions to summarize your data:

```ts
import { count, sum, avg, min, max } from '@tanstack/db'
import { count, sum, avg, min, max, minStr, maxStr, collect } from '@tanstack/db'

const orderStats = createCollection(liveQueryCollectionOptions({
query: (q) =>
Expand Down Expand Up @@ -1817,12 +1817,26 @@ avg(order.amount)
```

#### `min(value)`, `max(value)`
Find minimum and maximum values:
Find minimum and maximum values (coerces to numbers):
```ts
min(user.salary)
max(order.amount)
```

#### `minStr(value)`, `maxStr(value)`
Find minimum and maximum using string comparison. Unlike `min`/`max` which coerce values to numbers, these preserve string comparison:
```ts
minStr(event.createdAt) // Works correctly with ISO 8601 timestamps
maxStr(item.code) // Lexicographic comparison for any string
```

#### `collect(value)`
Collect all values in a group into an array:
```ts
collect(order.id) // Array of all order IDs in group
collect(order.amount) // Array of all amounts
```

### Function Composition

Functions can be composed and chained:
Expand Down
27 changes: 27 additions & 0 deletions packages/db-ivm/src/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,32 @@ export function mode<T>(
}
}

/**
* Creates a collect aggregate function that gathers all values into an array
* Similar to SQL's array_agg or GROUP_CONCAT
* @param valueExtractor Function to extract a value from each data entry
*/
export function collect<T, V = T>(
valueExtractor: (value: T) => V = (v) => v as unknown as V
): AggregateFunction<T, Array<V>, Array<V>> {
return {
preMap: (data: T) => [valueExtractor(data)],
reduce: (values: Array<[Array<V>, number]>) => {
const allValues: Array<V> = []
for (const [valueArray, multiplicity] of values) {
for (const value of valueArray) {
// Add each value 'multiplicity' times for correct IVM semantics
for (let i = 0; i < multiplicity; i++) {
allValues.push(value)
}
}
}
return allValues
},
// No postMap - return the array directly
}
}

export const groupByOperators = {
sum,
count,
Expand All @@ -374,4 +400,5 @@ export const groupByOperators = {
max,
median,
mode,
collect,
}
74 changes: 74 additions & 0 deletions packages/db-ivm/tests/operators/groupBy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { D2 } from "../../src/d2.js"
import { MultiSet } from "../../src/multiset.js"
import {
avg,
collect,
count,
groupBy,
max,
Expand Down Expand Up @@ -662,6 +663,79 @@ describe(`Operators`, () => {
expect(latestMessage.getInner()).toEqual(expectedResult)
})

test(`with collect aggregate`, () => {
const graph = new D2()
const input = graph.newInput<{
category: string
item: string
}>()
let latestMessage: any = null

input.pipe(
groupBy((data) => ({ category: data.category }), {
items: collect((data) => data.item),
}),
output((message) => {
latestMessage = message
})
)

graph.finalize()

input.sendData(
new MultiSet([
[{ category: `A`, item: `apple` }, 1],
[{ category: `A`, item: `avocado` }, 1],
[{ category: `B`, item: `banana` }, 1],
])
)
graph.run()

expect(latestMessage).not.toBeNull()

const result = latestMessage.getInner()
expect(result).toHaveLength(2)

const categoryA = result.find(
([key]: any) => key[0] === `{"category":"A"}`
)
expect(categoryA).toBeDefined()
expect(categoryA[0][1].items).toEqual([`apple`, `avocado`])

const categoryB = result.find(
([key]: any) => key[0] === `{"category":"B"}`
)
expect(categoryB).toBeDefined()
expect(categoryB[0][1].items).toEqual([`banana`])

// Add another item to category A
input.sendData(new MultiSet([[{ category: `A`, item: `apricot` }, 1]]))
graph.run()

const updatedResult = latestMessage.getInner()
// IVM returns deltas: -1 for old state, +1 for new state
const updatedCategoryA = updatedResult.find(
([[key], weight]: any) => key === `{"category":"A"}` && weight === 1
)
expect(updatedCategoryA).toBeDefined()
expect(updatedCategoryA[0][1].items).toEqual([
`apple`,
`avocado`,
`apricot`,
])

// Remove an item from category A
input.sendData(new MultiSet([[{ category: `A`, item: `apple` }, -1]]))
graph.run()

const afterRemoval = latestMessage.getInner()
const categoryAAfterRemoval = afterRemoval.find(
([[key], weight]: any) => key === `{"category":"A"}` && weight === 1
)
expect(categoryAAfterRemoval).toBeDefined()
expect(categoryAAfterRemoval[0][1].items).toEqual([`avocado`, `apricot`])
})

test(`complete group removal with sum aggregate`, () => {
const graph = new D2()
const input = graph.newInput<{
Expand Down
25 changes: 25 additions & 0 deletions packages/db/src/query/builder/aggregates/avg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { groupByOperators } from "@tanstack/db-ivm"
import { Aggregate } from "../../ir.js"
import { toExpression } from "../ref-proxy.js"
import type { AggregateReturnType, ExpressionLike } from "../operators/types.js"

// ============================================================
// CONFIG
// ============================================================

const avgConfig = {
factory: groupByOperators.avg,
valueTransform: `numeric` as const,
}

// ============================================================
// BUILDER FUNCTION
// ============================================================

export function avg<T extends ExpressionLike>(arg: T): AggregateReturnType<T> {
return new Aggregate(
`avg`,
[toExpression(arg)],
avgConfig
) as AggregateReturnType<T>
}
39 changes: 39 additions & 0 deletions packages/db/src/query/builder/aggregates/collect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { groupByOperators } from "@tanstack/db-ivm"
import { Aggregate } from "../../ir.js"
import { toExpression } from "../ref-proxy.js"
import type { ExpressionLike, ExtractType } from "../operators/types.js"

// ============================================================
// CONFIG
// ============================================================

const collectConfig = {
factory: groupByOperators.collect,
valueTransform: `raw` as const,
}

// ============================================================
// BUILDER FUNCTION
// ============================================================

/**
* Collects all values in a group into an array.
* Similar to SQL's array_agg or GROUP_CONCAT.
*
* @example
* ```typescript
* // Collect all posts for each user
* query
* .from({ posts: postsCollection })
* .groupBy(({ posts }) => posts.userId)
* .select(({ posts }) => ({
* userId: posts.userId,
* allPosts: collect(posts),
* }))
* ```
*/
export function collect<T extends ExpressionLike>(
arg: T
): Aggregate<Array<ExtractType<T>>> {
return new Aggregate(`collect`, [toExpression(arg)], collectConfig)
}
21 changes: 21 additions & 0 deletions packages/db/src/query/builder/aggregates/count.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { groupByOperators } from "@tanstack/db-ivm"
import { Aggregate } from "../../ir.js"
import { toExpression } from "../ref-proxy.js"
import type { ExpressionLike } from "../operators/types.js"

// ============================================================
// CONFIG
// ============================================================

const countConfig = {
factory: groupByOperators.count,
valueTransform: `raw` as const,
}

// ============================================================
// BUILDER FUNCTION
// ============================================================

export function count(arg: ExpressionLike): Aggregate<number> {
return new Aggregate(`count`, [toExpression(arg)], countConfig)
}
11 changes: 11 additions & 0 deletions packages/db/src/query/builder/aggregates/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Re-export all aggregates
// Importing from here will auto-register all aggregate evaluators

export { sum } from "./sum.js"
export { count } from "./count.js"
export { avg } from "./avg.js"
export { min } from "./min.js"
export { max } from "./max.js"
export { collect } from "./collect.js"
export { minStr } from "./minStr.js"
export { maxStr } from "./maxStr.js"
25 changes: 25 additions & 0 deletions packages/db/src/query/builder/aggregates/max.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { groupByOperators } from "@tanstack/db-ivm"
import { Aggregate } from "../../ir.js"
import { toExpression } from "../ref-proxy.js"
import type { AggregateReturnType, ExpressionLike } from "../operators/types.js"

// ============================================================
// CONFIG
// ============================================================

const maxConfig = {
factory: groupByOperators.max,
valueTransform: `numericOrDate` as const,
}

// ============================================================
// BUILDER FUNCTION
// ============================================================

export function max<T extends ExpressionLike>(arg: T): AggregateReturnType<T> {
return new Aggregate(
`max`,
[toExpression(arg)],
maxConfig
) as AggregateReturnType<T>
}
Loading