Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .changeset/expose-shape-stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
'@tanstack/electric-db-collection': minor
---

feat(electric-db-collection): expose underlying ShapeStream via shapeStream getter

Added a `shapeStream` getter to `ElectricCollectionUtils` that allows users to access the underlying `ShapeStream` instance from an electric collection. This enables access to ShapeStream properties like the shape handle.

```typescript
const stream = collection.utils.shapeStream
if (stream) {
console.log(stream.shapeHandle)
}
```
19 changes: 19 additions & 0 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ export interface ElectricCollectionUtils<
> extends UtilsRecord {
awaitTxId: AwaitTxIdFn
awaitMatch: AwaitMatchFn<T>
/**
* The underlying ShapeStream instance, or undefined if not yet connected
*/
readonly shapeStream: ShapeStream<T> | undefined
}

/**
Expand Down Expand Up @@ -476,6 +480,9 @@ export function electricCollectionOptions<T extends Row<unknown>>(
// Buffer messages since last up-to-date to handle race conditions
const currentBatchMessages = new Store<Array<Message<any>>>([])

// Store reference to the ShapeStream for accessing the shape handle
const streamRef = new Store<ShapeStream<T> | null>(null)

/**
* Helper function to remove multiple matches from the pendingMatches store
*/
Expand Down Expand Up @@ -517,6 +524,7 @@ export function electricCollectionOptions<T extends Row<unknown>>(
resolveMatchedPendingMatches,
collectionId: config.id,
testHooks: config[ELECTRIC_TEST_HOOKS],
streamRef,
})

/**
Expand Down Expand Up @@ -758,6 +766,9 @@ export function electricCollectionOptions<T extends Row<unknown>>(
utils: {
awaitTxId,
awaitMatch,
get shapeStream() {
return streamRef.state ?? undefined
},
},
}
}
Expand Down Expand Up @@ -788,6 +799,7 @@ function createElectricSync<T extends Row<unknown>>(
resolveMatchedPendingMatches: () => void
collectionId?: string
testHooks?: ElectricTestHooks
streamRef: Store<ShapeStream<T> | null>
},
): SyncConfig<T> {
const {
Expand All @@ -800,6 +812,7 @@ function createElectricSync<T extends Row<unknown>>(
resolveMatchedPendingMatches,
collectionId,
testHooks,
streamRef,
} = options
const MAX_BATCH_MESSAGES = 1000 // Safety limit for message buffer

Expand Down Expand Up @@ -907,6 +920,10 @@ function createElectricSync<T extends Row<unknown>>(
return
},
})

// Store the stream reference for external access via getShapeStream utility
streamRef.setState(() => stream)

let transactionStarted = false
const newTxids = new Set<Txid>()
const newSnapshots: Array<PostgresSnapshot> = []
Expand Down Expand Up @@ -1152,6 +1169,8 @@ function createElectricSync<T extends Row<unknown>>(
abortController.abort()
// Reset deduplication tracking so collection can load fresh data if restarted
loadSubsetDedupe?.reset()
// Clear the stream reference
streamRef.setState(() => null)
},
}
},
Expand Down
4 changes: 4 additions & 0 deletions packages/electric-db-collection/tests/electric.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ describe(`Electric collection type resolution tests`, () => {
// Verify the specific properties that define ElectricCollectionUtils exist and are functions
expectTypeOf(todosCollection.utils.awaitTxId).toBeFunction
expectTypeOf(todosCollection.utils.awaitMatch).toBeFunction
// shapeStream is a getter property, not a function
expectTypeOf(todosCollection.utils.shapeStream).toMatchTypeOf<
object | undefined
>()
})

it(`should properly type the onInsert, onUpdate, and onDelete handlers`, () => {
Expand Down
Loading