Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/fix-uint8array-join-keys.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@tanstack/db-ivm": patch
"@tanstack/db": patch
---

Fix joining collections by small Uint8Array keys to use content-based comparison instead of reference equality.

Previously, when joining collections using Uint8Array keys (like ULIDs or UUIDs), the Index class would compare keys by reference rather than by content. This caused joins to fail when the same byte array data existed in separate instances.

This fix introduces shared Uint8Array normalization utilities that convert small Uint8Arrays (≤128 bytes) to string representations for content-based equality checking. The normalization logic is now shared between `db-ivm` and `db` packages, eliminating code duplication.

Fixes #896
1 change: 1 addition & 0 deletions packages/db-ivm/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from "./d2.js"
export * from "./multiset.js"
export * from "./operators/index.js"
export * from "./types.js"
export * from "./utils/uint8array.js"
79 changes: 52 additions & 27 deletions packages/db-ivm/src/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import { MultiSet } from "./multiset.js"
import { hash } from "./hashing/index.js"
import { normalizeValue as normalizeKey } from "./utils/uint8array.js"
import type { Hash } from "./hashing/index.js"

// We use a symbol to represent the absence of a prefix, unprefixed values a stored
Expand Down Expand Up @@ -151,6 +152,7 @@ export class Index<TKey, TValue, TPrefix = any> {
*/
#inner: IndexMap<TKey, TValue, TPrefix>
#consolidatedMultiplicity: Map<TKey, number> = new Map() // sum of multiplicities per key
#originalKeys: Map<TKey, TKey> = new Map() // map from normalized keys to original keys
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little nervous of the overhead of adding this additional map and the lookups. We are making joins slower for all cases in order to support an edge case.


constructor() {
this.#inner = new Map()
Expand Down Expand Up @@ -200,7 +202,7 @@ export class Index<TKey, TValue, TPrefix = any> {
* @returns True if the index has the key, false otherwise.
*/
has(key: TKey): boolean {
return this.#inner.has(key)
return this.#inner.has(normalizeKey(key) as TKey)
}

/**
Expand All @@ -209,7 +211,9 @@ export class Index<TKey, TValue, TPrefix = any> {
* @returns True if the key has non-zero consolidated multiplicity, false otherwise.
*/
hasPresence(key: TKey): boolean {
return (this.#consolidatedMultiplicity.get(key) || 0) !== 0
return (
(this.#consolidatedMultiplicity.get(normalizeKey(key) as TKey) || 0) !== 0
)
}

/**
Expand All @@ -218,15 +222,18 @@ export class Index<TKey, TValue, TPrefix = any> {
* @returns The consolidated multiplicity for the key.
*/
getConsolidatedMultiplicity(key: TKey): number {
return this.#consolidatedMultiplicity.get(key) || 0
return this.#consolidatedMultiplicity.get(normalizeKey(key) as TKey) || 0
}

/**
* Get all keys that have presence (non-zero consolidated multiplicity).
* @returns An iterator of keys with non-zero consolidated multiplicity.
*/
getPresenceKeys(): Iterable<TKey> {
return this.#consolidatedMultiplicity.keys()
*getPresenceKeys(): Iterable<TKey> {
for (const normalizedKey of this.#consolidatedMultiplicity.keys()) {
const originalKey = this.#originalKeys.get(normalizedKey) || normalizedKey
yield originalKey
}
}

/**
Expand All @@ -244,7 +251,7 @@ export class Index<TKey, TValue, TPrefix = any> {
* @returns An iterator of value tuples [value, multiplicity].
*/
*getIterator(key: TKey): Iterable<[TValue, number]> {
const mapOrSingleValue = this.#inner.get(key)
const mapOrSingleValue = this.#inner.get(normalizeKey(key) as TKey)
if (isSingleValue(mapOrSingleValue)) {
yield mapOrSingleValue
} else if (mapOrSingleValue === undefined) {
Expand Down Expand Up @@ -273,9 +280,10 @@ export class Index<TKey, TValue, TPrefix = any> {
* @returns An iterable of all key-value pairs (and their multiplicities) in the index.
*/
*entries(): Iterable<[TKey, [TValue, number]]> {
for (const key of this.#inner.keys()) {
for (const valueTuple of this.getIterator(key)) {
yield [key, valueTuple]
for (const normalizedKey of this.#inner.keys()) {
const originalKey = this.#originalKeys.get(normalizedKey) || normalizedKey
for (const valueTuple of this.getIterator(normalizedKey)) {
yield [originalKey, valueTuple]
}
}
}
Expand All @@ -287,8 +295,9 @@ export class Index<TKey, TValue, TPrefix = any> {
* @returns An iterator of all *keys* in the index and their corresponding value iterator.
*/
*entriesIterators(): Iterable<[TKey, Iterable<[TValue, number]>]> {
for (const key of this.#inner.keys()) {
yield [key, this.getIterator(key)]
for (const normalizedKey of this.#inner.keys()) {
const originalKey = this.#originalKeys.get(normalizedKey) || normalizedKey
yield [originalKey, this.getIterator(normalizedKey)]
}
}

Expand All @@ -302,27 +311,40 @@ export class Index<TKey, TValue, TPrefix = any> {
// If the multiplicity is 0, do nothing
if (multiplicity === 0) return

// Normalize the key for Map operations
const normalizedKey = normalizeKey(key) as TKey

// Store the original key if this is a new key
if (!this.#originalKeys.has(normalizedKey)) {
this.#originalKeys.set(normalizedKey, key)
}

// Update consolidated multiplicity tracking
const newConsolidatedMultiplicity =
(this.#consolidatedMultiplicity.get(key) || 0) + multiplicity
(this.#consolidatedMultiplicity.get(normalizedKey) || 0) + multiplicity
if (newConsolidatedMultiplicity === 0) {
this.#consolidatedMultiplicity.delete(key)
this.#consolidatedMultiplicity.delete(normalizedKey)
// Also clean up the original key mapping when the key is completely removed
this.#originalKeys.delete(normalizedKey)
} else {
this.#consolidatedMultiplicity.set(key, newConsolidatedMultiplicity)
this.#consolidatedMultiplicity.set(
normalizedKey,
newConsolidatedMultiplicity
)
}

const mapOrSingleValue = this.#inner.get(key)
const mapOrSingleValue = this.#inner.get(normalizedKey)

if (mapOrSingleValue === undefined) {
// First value for this key
this.#inner.set(key, valueTuple)
this.#inner.set(normalizedKey, valueTuple)
return
}

if (isSingleValue(mapOrSingleValue)) {
// Handle transition from single value to map
this.#handleSingleValueTransition(
key,
normalizedKey,
mapOrSingleValue,
value,
multiplicity
Expand All @@ -338,28 +360,31 @@ export class Index<TKey, TValue, TPrefix = any> {
const prefixMap = new PrefixMap<TValue, TPrefix>()
prefixMap.set(NO_PREFIX, mapOrSingleValue)
prefixMap.set(prefix, valueTuple)
this.#inner.set(key, prefixMap)
this.#inner.set(normalizedKey, prefixMap)
} else {
// Add to existing ValueMap
const isEmpty = mapOrSingleValue.addValue(value, multiplicity)
if (isEmpty) {
this.#inner.delete(key)
this.#inner.delete(normalizedKey)
this.#originalKeys.delete(normalizedKey)
}
}
} else {
// Handle existing PrefixMap
const isEmpty = mapOrSingleValue.addValue(value, multiplicity)
if (isEmpty) {
this.#inner.delete(key)
this.#inner.delete(normalizedKey)
this.#originalKeys.delete(normalizedKey)
}
}
}

/**
* Handle the transition from a single value to either a ValueMap or PrefixMap
* Note: The key parameter should already be normalized before calling this method
*/
#handleSingleValueTransition(
key: TKey,
normalizedKey: TKey,
currentSingleValue: SingleValue<TValue>,
newValue: TValue,
multiplicity: number
Expand All @@ -370,9 +395,9 @@ export class Index<TKey, TValue, TPrefix = any> {
if (currentValue === newValue) {
const newMultiplicity = currentMultiplicity + multiplicity
if (newMultiplicity === 0) {
this.#inner.delete(key)
this.#inner.delete(normalizedKey)
} else {
this.#inner.set(key, [newValue, newMultiplicity])
this.#inner.set(normalizedKey, [newValue, newMultiplicity])
}
return
}
Expand All @@ -388,9 +413,9 @@ export class Index<TKey, TValue, TPrefix = any> {
) {
const newMultiplicity = currentMultiplicity + multiplicity
if (newMultiplicity === 0) {
this.#inner.delete(key)
this.#inner.delete(normalizedKey)
} else {
this.#inner.set(key, [newValue, newMultiplicity])
this.#inner.set(normalizedKey, [newValue, newMultiplicity])
}
return
}
Expand All @@ -401,7 +426,7 @@ export class Index<TKey, TValue, TPrefix = any> {
const valueMap = new ValueMap<TValue>()
valueMap.set(hash(currentValue), currentSingleValue)
valueMap.set(hash(newValue), [newValue, multiplicity])
this.#inner.set(key, valueMap)
this.#inner.set(normalizedKey, valueMap)
} else {
// At least one has a prefix, use PrefixMap
const prefixMap = new PrefixMap<TValue, TPrefix>()
Expand All @@ -418,7 +443,7 @@ export class Index<TKey, TValue, TPrefix = any> {
prefixMap.set(newPrefix, [newValue, multiplicity])
}

this.#inner.set(key, prefixMap)
this.#inner.set(normalizedKey, prefixMap)
}
}

Expand Down
52 changes: 52 additions & 0 deletions packages/db-ivm/src/utils/uint8array.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Threshold for normalizing Uint8Arrays to string representations.
* Arrays larger than this will use reference equality to avoid memory overhead.
* 128 bytes is enough for common ID formats (ULIDs are 16 bytes, UUIDs are 16 bytes)
* while avoiding excessive string allocation for large binary data.
*/
export const UINT8ARRAY_NORMALIZE_THRESHOLD = 128

/**
* Check if a value is a Uint8Array or Buffer
*/
export function isUint8Array(value: unknown): value is Uint8Array {
return (
(typeof Buffer !== `undefined` && value instanceof Buffer) ||
value instanceof Uint8Array
)
}

/**
* Normalize a Uint8Array to a string representation for content-based comparison.
* This enables Uint8Arrays with the same byte content to be treated as equal,
* even if they are different object instances.
*
* @param value - The Uint8Array or Buffer to normalize
* @returns A string representation of the byte array
*/
export function normalizeUint8Array(value: Uint8Array): string {
// Convert to a string representation that can be used as a Map key
// Use a special prefix to avoid collisions with user strings
return `__u8__${Array.from(value).join(`,`)}`
}

/**
* Normalize a value for Map key or comparison usage.
* Converts small Uint8Arrays/Buffers to string representations for content-based equality.
* This enables proper comparison and Map key usage for binary data like ULIDs.
*
* @param value - The value to normalize
* @returns The normalized value (string for small Uint8Arrays, original value otherwise)
*/
export function normalizeValue<T>(value: T): T | string {
if (isUint8Array(value)) {
// Only normalize small arrays to avoid memory overhead for large binary data
if (value.byteLength <= UINT8ARRAY_NORMALIZE_THRESHOLD) {
return normalizeUint8Array(value)
}
// For large arrays, fall back to reference equality
// Users working with large binary data should use a derived key if needed
}

return value
}
68 changes: 68 additions & 0 deletions packages/db-ivm/tests/operators/join.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
describe(`Operators`, () => {
describe(`Join operation`, () => {
testJoin()
testUint8ArrayKeyJoin()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to keep this implementation, can we incorporate this into the main testJoin function rather than adding as a new one with just this single test?

})
})

Expand Down Expand Up @@ -342,3 +343,70 @@ function testJoin() {
assertKeyedResults(`join batch processing`, batchResult, expectedResults, 6)
})
}

function testUint8ArrayKeyJoin() {
test(`join with Uint8Array keys compared by content`, () => {
const graph = new D2()
const inputA = graph.newInput<[Uint8Array, string]>()
const inputB = graph.newInput<[Uint8Array, string]>()
const results: Array<[Uint8Array, [string, string]]> = []

inputA.pipe(
join(inputB),
output((message) => {
for (const [item] of message.getInner()) {
results.push(item)
}
})
)

graph.finalize()

// Create separate Uint8Array instances with the same content
const key1A = new Uint8Array([1, 2, 3, 4])
const key1B = new Uint8Array([1, 2, 3, 4])
const key2A = new Uint8Array([5, 6, 7, 8])
const key2B = new Uint8Array([5, 6, 7, 8])

// Verify that the arrays are different objects
expect(key1A).not.toBe(key1B)
expect(key2A).not.toBe(key2B)

// Send data with different Uint8Array instances but same content
inputA.sendData(
new MultiSet([
[[key1A, `a`], 1],
[[key2A, `b`], 1],
])
)

inputB.sendData(
new MultiSet([
[[key1B, `x`], 1],
[[key2B, `y`], 1],
])
)

graph.run()

// Should join successfully based on content, not reference
expect(results).toHaveLength(2)

// Verify the joined data is correct
const sortedResults = results.sort((a, b) => {
const [keyA] = a
const [keyB] = b
return keyA[0]! - keyB[0]!
})

const [key1, [val1A, val1B]] = sortedResults[0]!
expect(Array.from(key1)).toEqual([1, 2, 3, 4])
expect(val1A).toBe(`a`)
expect(val1B).toBe(`x`)

const [key2, [val2A, val2B]] = sortedResults[1]!
expect(Array.from(key2)).toEqual([5, 6, 7, 8])
expect(val2A).toBe(`b`)
expect(val2B).toBe(`y`)
})
}
Loading
Loading