From 570f0b7915edd06396d8fc32466cb688dc0e1338 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 17 May 2026 16:14:02 +0100 Subject: [PATCH 01/10] plan: add entity timeline query design Co-authored-by: Cursor --- ENTITY_TIMELINE_QUERY_PLAN.md | 480 ++++++++++++++++++++++++++++++++++ 1 file changed, 480 insertions(+) create mode 100644 ENTITY_TIMELINE_QUERY_PLAN.md diff --git a/ENTITY_TIMELINE_QUERY_PLAN.md b/ENTITY_TIMELINE_QUERY_PLAN.md new file mode 100644 index 0000000000..8aeb0a024c --- /dev/null +++ b/ENTITY_TIMELINE_QUERY_PLAN.md @@ -0,0 +1,480 @@ +# Electric Agents `createEntityTimelineQuery` plan + +## Goal + +Add a row-oriented, fine-grained live query API for Electric Agents timelines: + +```ts +createEntityTimelineQuery(db) +``` + +This query should produce an ordered timeline collection of individual events +rather than one aggregate session object. It is intended for hot UI paths such +as the built-in app chat log, observe UI, and user-built timeline UIs. + +Keep `createEntityIncludesQuery` as the snapshot/convenience API. It is useful +when consumers want the whole session shape at once, but it rematerializes the +session-level data structure on every streaming chunk. + +## Core direction + +Use TanStack DB's multi-source `from` for the outer timeline query and keep the +outer query simple: + +```ts +return q + .from({ + inbox, + run, + wake, + manifest, + contextInserted, + contextRemoved, + }) + .orderBy(({ inbox, run, wake, manifest, contextInserted, contextRemoved }) => + coalesce( + inbox.order, + run.order, + wake.order, + manifest.order, + contextInserted.order, + contextRemoved.order + ) + ) +``` + +Do not use `caseWhen` in the outer `select`. Without an outer `select`, the +query result keeps the discriminated/exclusive union produced by multi-source +`from`: + +```ts +type EntityTimelineQueryRow = + | { inbox: EntityTimelineInboxRow; run?: undefined; wake?: undefined; ... } + | { run: EntityTimelineRunRow; inbox?: undefined; wake?: undefined; ... } + | { wake: EntityTimelineWakeRow; inbox?: undefined; run?: undefined; ... } + | ... +``` + +Any event-type projection should happen in the source subquery for that event +type. The outer query should only combine and order already-shaped event rows. + +Use `caseWhen` only where it is materially useful, such as branch-dependent +includes/joins after a union source or conditional nested projection. It should +not be required for the top-level timeline union. + +## Why this exists + +The current UI path is: + +```text +createEntityIncludesQuery + -> normalizeEntityTimelineData + -> buildTimelineEntries + -> EntityTimeline +``` + +`createEntityIncludesQuery` returns one aggregate row with nested arrays: + +- `runs` +- `inbox` +- `wakes` +- `entities` +- nested `texts`, `toolCalls`, `steps`, `errors` +- text strings built from `concat(toArray(textDeltas...))` + +During streaming, every new text delta changes the aggregate row. The render +pipeline has memoization to avoid repainting settled rows, but the session data +structure is still rebuilt on each chunk. The new API should let TanStack DB +maintain the timeline incrementally at row and nested-collection granularity. + +## UI use cases to support + +The built-in app UI currently needs: + +- A stable ordered row list for virtualized rendering and find/search. +- Rows for processed user messages, agent runs/responses, wakes, manifests, + context insertion/removal events, and possibly top-level errors/lifecycle + events later. +- Agent run rows with ordered nested content: + - text segments; + - tool calls; + - run errors; + - steps, even if the current UI does not render them prominently. +- Text segment rows with ordered text chunks/deltas, so streaming updates only + the active text's nested collection rather than the whole timeline. +- Manifest rows for the app timeline, currently queried separately and merged in + `useEntityTimeline`. +- Related entity data/status for manifest badges and "open entity" actions. +- Pending inbox messages separately for queued/editable messages in + `MessageInput` and `EntityContextDrawer`. +- A cheap way to derive whether generation is active. + +UI-only behavior should stay outside the query: + +- Markdown rendering and render cache management. +- Tool argument parsing for display. +- Tool result stringification for display/copy. +- The first-message `isInitial` flag. +- `responseTimestamp`, which depends on the previous user message. +- Optimistic inline queued message projection. + +## Proposed source subqueries + +Each source subquery should project rows into a runtime-level timeline event +shape with a common base: + +```ts +type EntityTimelineOrder = string | number + +type EntityTimelineRowBase = { + order: EntityTimelineOrder +} +``` + +Do not project synthetic timeline identifiers in each source subquery. TanStack +DB adds `$key` virtual props to live query rows, and multi-source `from` prefixes +the outer row key with the active source alias. A no-select multi-source row +therefore has enough identity and discrimination already: + +```ts +{ + $key: `wake:123`, + wake: { + $key: 123, + // projected wake fields... + }, +} +``` + +Consumers should use the outer row's `$key` as the stable timeline row key and +narrow by the active source alias (`row.wake`, `row.run`, `row.inbox`, etc.). +The source row's own `$key` remains available inside the active alias if callers +need the original collection key. + +This matches the TanStack DB branch implementation: union branches are wrapped +under their source alias, no-select union queries return the namespaced row, and +the union stream key is prefixed with the source alias. + +Do not add a separate `kind` field just to discriminate timeline rows. The +active source alias is the discriminant. In UI code, replace old +`section.kind` switches with small alias-narrowing helpers: + +```ts +function isInboxRow(row: EntityTimelineQueryRow): row is InboxTimelineRow { + return row.inbox !== undefined +} + +function isRunRow(row: EntityTimelineQueryRow): row is RunTimelineRow { + return row.run !== undefined +} +``` + +For exhaustive handling, use a helper that branches over every known alias and +assigns the remainder to `never`. This keeps type safety without duplicating the +source alias as another string field that can drift from the query shape. + +### Inbox source + +Filter to processed messages by default: + +```ts +const inbox = q + .from({ inbox: db.collections.inbox }) + .where(({ inbox }) => eq(coalesce(inbox.status, `processed`), `processed`)) + .select(({ inbox }) => ({ + order: coalesce(inbox._seq, -1), + from: coalesce(inbox.from, `unknown`), + payload: inbox.payload, + timestamp: coalesce(inbox.timestamp, EPOCH_ISO), + mode: coalesce(inbox.mode, `immediate`), + status: coalesce(inbox.status, `processed`), + position: inbox.position, + processed_at: inbox.processed_at, + cancelled_at: inbox.cancelled_at, + })) +``` + +Pending inbox should remain a separate query for the composer/drawer because it +has different sorting and editing behavior. + +### Run source + +The run source is the most important for fine-grained streaming behavior: + +```ts +const run = q.from({ run: db.collections.runs }).select(({ run }) => ({ + order: coalesce(run._seq, -1), + status: run.status, + finish_reason: run.finish_reason, + items: runItemsInclude(run.key), + errors: runErrorsInclude(run.key), + steps: runStepsInclude(run.key), +})) +``` + +`items` should be an ordered nested collection over texts and tool calls. Use +the same no-outer-select multi-source pattern here too, so item rows are +discriminated by active alias and keyed by DB: + +```ts +q.from({ + text, + toolCall, +}) + .where(({ text, toolCall }) => + eq(coalesce(text.run_id, toolCall.run_id), run.key) + ) + .orderBy(({ text, toolCall }) => coalesce(text.order, toolCall.order)) +``` + +The result shape should be: + +```ts +type EntityTimelineRunItem = + | { $key: string; text: EntityTimelineTextItem; toolCall?: undefined } + | { $key: string; text?: undefined; toolCall: EntityTimelineToolCallItem } +``` + +Text item rows should include ordered chunks/deltas rather than an eagerly +concatenated text string: + +```ts +type EntityTimelineTextItem = { + run_id: string + order: EntityTimelineOrder + status: `streaming` | `completed` + chunks: Collection +} + +type EntityTimelineTextChunk = { + text_id: string + run_id: string + order: EntityTimelineOrder + delta: string +} +``` + +Do not decide the final text materialization strategy in this plan. First review +how the streaming Markdown parser should consume chunks so the UI can avoid +unnecessary string rebuilding. + +Tool call item rows should preserve the current display fields: + +```ts +type EntityTimelineToolCallItem = { + run_id: string + order: EntityTimelineOrder + tool_name: string + status: `started` | `args_complete` | `executing` | `completed` | `failed` + args?: unknown + result?: unknown + error?: string +} +``` + +### Wake source + +Project the current wake display payload: + +```ts +const wake = q.from({ wake: db.collections.wakes }).select(({ wake }) => ({ + order: coalesce(wake._seq, -1), + payload: { + type: `wake` as const, + timestamp: wake.timestamp, + source: wake.source, + timeout: wake.timeout, + changes: wake.changes, + finished_child: wake.finished_child, + other_children: wake.other_children, + }, +})) +``` + +### Manifest source + +The app UI currently queries manifests separately and merges them into the +timeline. The new query should include them directly: + +```ts +const manifest = q + .from({ manifest: db.collections.manifests }) + .select(({ manifest }) => ({ + order: coalesce(manifest._seq, -1), + manifest, + })) +``` + +Entity status enrichment can be handled in one of two ways: + +1. Keep the app UI's separate status query against the global entity registry. +2. Add a runtime-level related-entity projection for child/source manifests. + +Prefer option 1 initially to keep `createEntityTimelineQuery` focused on the +entity stream itself. + +### Context sources + +Include context history rows so the query is comprehensive for userland +timeline UIs, even if the built-in chat UI chooses not to display them yet: + +```ts +const contextInserted = q + .from({ contextInserted: db.collections.contextInserted }) + .select(({ contextInserted }) => ({ + order: coalesce(contextInserted._seq, -1), + id: contextInserted.id, + name: contextInserted.name, + attrs: contextInserted.attrs, + content: contextInserted.content, + timestamp: contextInserted.timestamp, + })) +``` + +`contextRemoved` should mirror this shape in a `contextRemoved` source branch. + +### Steps, errors, and lifecycle-like rows + +Include rows chronologically when they are events that make sense inline in a +timeline. For steps, that means including them if they represent meaningful +execution milestones rather than merely run state metadata. They can be top-level +timeline rows, nested run items, or both depending on the display contract, but +they should not be omitted solely because the current built-in UI does not render +them today. + +Apply the same rule to top-level errors without `run_id`, entity lifecycle rows, +and signal rows: include them in `createEntityTimelineQuery` only when they are +events a user would reasonably expect to see at the point they happened. If a row +is primarily current entity state, keep it out of the event timeline and expose +it through state/status APIs instead. + +## Ordering + +The first implementation can keep the same practical order source as the live +query currently uses: `coalesce(_seq, -1)`. + +Longer term, the synchronous snapshot path has stronger offset-based ordering +via `__electricRowOffsets`. If live query rows can expose equivalent ordering +metadata, move the shared ordering logic into reusable helpers so the snapshot +and live timeline paths agree exactly. Before doing this, verify what the +available live-query offset means; it is only suitable for ordering if it is the +last Electric offset that affected the row. + +For run rows, preserve the existing behavior where a run is anchored to the +earliest child event rather than a later run-row update. This may require a +derived `runOrder` subquery/field that is the minimum of: + +- the run row order; +- text item order; +- first text delta order; +- tool call order; +- step order. + +If that is too much for the first pass, document the gap and keep tests around +the old aggregate query until the row-oriented query reaches parity. + +## Runtime exports + +Add new exports alongside the existing aggregate API: + +```ts +export { createEntityIncludesQuery, createEntityTimelineQuery } + +export type { + EntityTimelineQueryRow, + EntityTimelineInboxRow, + EntityTimelineRunRow, + EntityTimelineRunItem, + EntityTimelineTextItem, + EntityTimelineTextChunk, + EntityTimelineToolCallItem, + EntityTimelineWakeRow, + EntityTimelineManifestRow, +} +``` + +Optionally add a convenience collection factory later: + +```ts +createEntityTimelineCollection(db) +``` + +The query function should come first so users can compose it inside their own +queries. + +## Built-in UI migration + +Migrate hot paths to the row-oriented query: + +1. `packages/agents-server-ui/src/hooks/useEntityTimeline.ts` + - Replace `createEntityIncludesQuery` with `createEntityTimelineQuery`. + - Remove the separate manifest query/merge once manifests are included. + - Keep the pending inbox query separate. + - Keep global entity status enrichment separate initially. +2. `packages/agents-server-ui/src/components/EntityTimeline.tsx` + - Accept the new row union, or adapt it to the current `TimelineEntry` + shape as an intermediate step. + - Use each row's `$key` virtual prop as the React/virtualizer key. + - Prefer rendering run rows from nested `items` rather than from a rebuilt + `section.items` array. +3. `packages/agents-server-ui/src/components/AgentResponse.tsx` + - Render nested run items. + - Update text rendering after reviewing the streaming Markdown parser; avoid + eager timeline-query concatenation. + - Keep markdown caching and tool display parsing in the component layer. +4. `packages/electric-ax/src/observe-ui.tsx` + - Move from aggregate includes to timeline rows. +5. `examples/deep-survey/src/ui/components/ChatSidebar.tsx` + - Move from aggregate includes to timeline rows if it remains a streaming + UI example. + +Keep `useChat` on `createEntityIncludesQuery` initially as a stable convenience +hook. A separate row-oriented hook can be added later. + +## Tests + +Add runtime tests for the new query: + +- Multi-source timeline returns inbox, run, wake, manifest, and context rows in + order. +- Rows from different source collections with the same raw key do not collide; + use the multi-source `$key` virtual prop as the public row key. +- Insert/update/delete in each source updates only the relevant branch rows. +- A text delta insertion updates the nested text chunks include for the active + run item. +- A tool call update updates the matching nested run item. +- Pending inbox rows are excluded from the main timeline by default. +- Manifests are present in the timeline without a separate merge. +- Run rows preserve current ordering semantics, including the "run anchored to + earliest child event" behavior. + +Add UI-level tests or focused hook tests: + +- `useEntityTimeline` no longer rebuilds the manifest merge path. +- Streaming text updates keep prior top-level timeline row identities stable + where TanStack DB exposes stable maintained rows. +- Pending/optimistic inbox behavior still works in `ChatView`. + +## Deferred decisions and audit notes + +- Do not add a convenience `text` string to text rows in the first pass. Leave + text materialization to consumers until we have reviewed how the streaming + Markdown parser should consume chunks optimally. +- Verify whether live query rows expose an Electric offset suitable for ordering. + It must be the last offset that affected the row to replace `_seq` safely. +- Leave `normalizeEntityTimelineData` and related aggregate helpers in place for + now. After the row-oriented timeline migration is complete, audit which + aggregate helpers are still used and remove unused compatibility code then. + +## Implementation phases + +1. Define row types and source subquery helpers in `agents-runtime`. +2. Implement `createEntityTimelineQuery` with multi-source `from` and no outer + `select`. +3. Add run nested includes for ordered text/tool-call items, errors, and steps. +4. Add text chunk includes and remove eager text concatenation from the new API. +5. Add parity tests against key `createEntityIncludesQuery` behaviors. +6. Migrate `agents-server-ui` behind the same public `useEntityTimeline` hook. +7. Migrate observe/example streaming UIs. +8. Document when to use `createEntityIncludesQuery` versus + `createEntityTimelineQuery`. From 15a532a4310f2498058832e7ceaf218026669f50 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 17 May 2026 16:25:03 +0100 Subject: [PATCH 02/10] plan: refine entity timeline query migration Co-authored-by: Cursor --- ENTITY_TIMELINE_QUERY_PLAN.md | 194 ++++++++++++++++++++++++++++------ 1 file changed, 162 insertions(+), 32 deletions(-) diff --git a/ENTITY_TIMELINE_QUERY_PLAN.md b/ENTITY_TIMELINE_QUERY_PLAN.md index 8aeb0a024c..24b747314e 100644 --- a/ENTITY_TIMELINE_QUERY_PLAN.md +++ b/ENTITY_TIMELINE_QUERY_PLAN.md @@ -16,6 +16,17 @@ Keep `createEntityIncludesQuery` as the snapshot/convenience API. It is useful when consumers want the whole session shape at once, but it rematerializes the session-level data structure on every streaming chunk. +Also add a public React hook for app and userland UIs: + +```ts +useEntityTimeline(db, opts?) +``` + +The hook should be exported for users to build their own timeline UIs. It should +wrap `createEntityTimelineQuery` and return the maintained timeline rows plus +useful derived state such as `generationActive`, pending inbox messages, and any +other small status values needed by the built-in UI. + ## Core direction Use TanStack DB's multi-source `from` for the outer timeline query and keep the @@ -62,6 +73,31 @@ Use `caseWhen` only where it is materially useful, such as branch-dependent includes/joins after a union source or conditional nested projection. It should not be required for the top-level timeline union. +## Query and hook options + +Add options to both `createEntityTimelineQuery` and `useEntityTimeline` so +callers can choose how inbox messages participate in the timeline: + +```ts +type EntityTimelineInboxMode = + | `processed` // default: processed messages inline, pending returned separately + | `none` // no inbox messages in the timeline + | `all` // include processed, pending, paused, steer, and cancelled inbox rows +``` + +Default to `processed`. This matches the built-in chat UX: processed user +messages appear chronologically in the timeline, while pending/queued messages +remain separately editable in the composer/drawer and can be projected +optimistically at the bottom of the UI. + +Use `none` for UIs that want a pure event/run stream without user messages. +Use `all` for debugging/audit UIs where pending/cancelled/steer rows should be +visible inline exactly where they happened. + +`useEntityTimeline` should always be free to run a separate pending-inbox query +when the UI needs editable queued messages, regardless of the main timeline +inbox mode. + ## Why this exists The current UI path is: @@ -155,6 +191,11 @@ This matches the TanStack DB branch implementation: union branches are wrapped under their source alias, no-select union queries return the namespaced row, and the union stream key is prefixed with the source alias. +This key rule also applies to selected source subqueries and nested collections: +the selected row should retain its `$key` virtual prop. Do not add `sourceKey` +fields for run, inbox, text, tool call, or chunk rows unless a future API proves +the virtual key is insufficient. + Do not add a separate `kind` field just to discriminate timeline rows. The active source alias is the discriminant. In UI code, replace old `section.kind` switches with small alias-narrowing helpers: @@ -175,7 +216,8 @@ source alias as another string field that can drift from the query shape. ### Inbox source -Filter to processed messages by default: +Filter to processed messages by default. The exact filter depends on +`opts.inboxMode`: ```ts const inbox = q @@ -235,6 +277,48 @@ type EntityTimelineRunItem = | { $key: string; text?: undefined; toolCall: EntityTimelineToolCallItem } ``` +### Nested includes rendering contract + +Do not wrap these nested includes in `toArray` in `createEntityTimelineQuery`. +The point of the new API is that `items`, `chunks`, `errors`, and `steps` remain +child live collections maintained by TanStack DB. + +Parent timeline rows should pass child collections down to row components: + +```tsx +function TimelineRunRow({ run }: { run: EntityTimelineRunRow }) { + return +} +``` + +Child components then subscribe to the child collection where they render it: + +```tsx +function AgentResponse({ + items, +}: { + items: Collection +}) { + const { data: runItems = [] } = useLiveQuery(items) + return runItems.map((item) => + item.text ? ( + + ) : ( + + ) + ) +} + +function TextItem({ text }: { text: EntityTimelineTextItem }) { + const { data: chunks = [] } = useLiveQuery(text.chunks) + // Markdown streaming strategy TBD; do not eagerly concatenate in the query. +} +``` + +This is the key performance boundary. A text delta should update the text row's +`chunks` child collection and the subscribed text component, not rematerialize +the whole run, whole timeline, or all previous messages. + Text item rows should include ordered chunks/deltas rather than an eagerly concatenated text string: @@ -335,12 +419,21 @@ const contextInserted = q ### Steps, errors, and lifecycle-like rows -Include rows chronologically when they are events that make sense inline in a -timeline. For steps, that means including them if they represent meaningful -execution milestones rather than merely run state metadata. They can be top-level -timeline rows, nested run items, or both depending on the display contract, but -they should not be omitted solely because the current built-in UI does not render -them today. +`steps` are model-generation step lifecycle rows emitted by the outbound bridge: + +- `onStepStart()` inserts a step with `status: 'started'`, `step_number`, + `run_id`, and optional `model_provider`/`model_id`. +- `onStepEnd()` updates that step to `status: 'completed'` with + `finish_reason` and optional `duration_ms`. + +They are not user messages or tool calls. They describe LLM/model execution +attempts within a run. Today they are used as run metadata and for ordering +anchoring, but the built-in chat UI does not render them as visible rows. + +First pass: keep steps as nested run metadata. Include them in the run row so +observability UIs can render model/step metadata, but do not make them top-level +timeline rows unless we decide they are user-visible inline events. A later +option can expose step rows inline for debugging/audit views. Apply the same rule to top-level errors without `run_id`, entity lifecycle rows, and signal rows: include them in `createEntityTimelineQuery` only when they are @@ -350,15 +443,27 @@ it through state/status APIs instead. ## Ordering -The first implementation can keep the same practical order source as the live -query currently uses: `coalesce(_seq, -1)`. +Prefer an explicit stable event-order value written during stream writes. The +timeline needs "when this event first happened", not "when this row was last +updated". Run rows, step rows, text rows, and tool-call rows are often updated +after they are inserted; using the last update offset would move old timeline +items forward incorrectly. -Longer term, the synchronous snapshot path has stronger offset-based ordering -via `__electricRowOffsets`. If live query rows can expose equivalent ordering -metadata, move the shared ordering logic into reusable helpers so the snapshot -and live timeline paths agree exactly. Before doing this, verify what the -available live-query offset means; it is only suitable for ordering if it is the -last Electric offset that affected the row. +Best target design: + +- add or reuse a cross-collection `timeline_order`/event-order value on built-in + event rows; +- assign it when the event row is first written; +- preserve it on later updates; +- use it as the primary order for top-level rows and nested run items. + +If Electric exposes the insert offset or stable first offset for each row in live +queries, that can replace an explicit event-order field. If the available offset +is the last offset that affected the row, do not use it for timeline ordering. + +`_seq` can remain a temporary compatibility source while the new ordering field +or first-offset support is implemented, but the plan should not treat `_seq` as +the final design. For run rows, preserve the existing behavior where a run is anchored to the earliest child event rather than a later run-row update. This may require a @@ -373,12 +478,18 @@ derived `runOrder` subquery/field that is the minimum of: If that is too much for the first pass, document the gap and keep tests around the old aggregate query until the row-oriented query reaches parity. +If we introduce a stable event-order field, this becomes simpler: the run's +display order should be the minimum event order across the run row, text rows, +text chunks, tool calls, and steps. That keeps a streaming run anchored near the +message that caused it even when the run row is completed later. + ## Runtime exports Add new exports alongside the existing aggregate API: ```ts export { createEntityIncludesQuery, createEntityTimelineQuery } +export { useEntityTimeline } export type { EntityTimelineQueryRow, @@ -407,18 +518,25 @@ queries. Migrate hot paths to the row-oriented query: 1. `packages/agents-server-ui/src/hooks/useEntityTimeline.ts` - - Replace `createEntityIncludesQuery` with `createEntityTimelineQuery`. + - Replace `createEntityIncludesQuery` with the exported + `useEntityTimeline`/`createEntityTimelineQuery` path. - Remove the separate manifest query/merge once manifests are included. - - Keep the pending inbox query separate. + - Keep the pending inbox query separate inside the hook and return + `pendingInbox`. + - Return `generationActive` from the hook as derived state, preferably from a + small dedicated live query over active runs rather than by scanning the + full timeline on every change. - Keep global entity status enrichment separate initially. + - When matching processed/pending inbox messages, use `row.inbox.$key` as the + raw inbox key and `row.$key` only as the timeline row key. 2. `packages/agents-server-ui/src/components/EntityTimeline.tsx` - Accept the new row union, or adapt it to the current `TimelineEntry` shape as an intermediate step. - Use each row's `$key` virtual prop as the React/virtualizer key. - - Prefer rendering run rows from nested `items` rather than from a rebuilt - `section.items` array. + - Pass nested child collections such as `run.items` to child row components + instead of rebuilding `section.items` arrays. 3. `packages/agents-server-ui/src/components/AgentResponse.tsx` - - Render nested run items. + - Subscribe to nested run items with `useLiveQuery(run.items)`. - Update text rendering after reviewing the streaming Markdown parser; avoid eager timeline-query concatenation. - Keep markdown caching and tool display parsing in the component layer. @@ -429,7 +547,8 @@ Migrate hot paths to the row-oriented query: UI example. Keep `useChat` on `createEntityIncludesQuery` initially as a stable convenience -hook. A separate row-oriented hook can be added later. +hook for aggregate chat data. `useEntityTimeline` is the new row-oriented hook +for timeline UIs. ## Tests @@ -442,8 +561,12 @@ Add runtime tests for the new query: - Insert/update/delete in each source updates only the relevant branch rows. - A text delta insertion updates the nested text chunks include for the active run item. +- Parent timeline rows expose child collections, not arrays, for nested + includes. +- Child UI components subscribe to child collections with `useLiveQuery`. - A tool call update updates the matching nested run item. - Pending inbox rows are excluded from the main timeline by default. +- `opts.inboxMode` can exclude inbox rows or include all inbox statuses. - Manifests are present in the timeline without a separate merge. - Run rows preserve current ordering semantics, including the "run anchored to earliest child event" behavior. @@ -454,27 +577,34 @@ Add UI-level tests or focused hook tests: - Streaming text updates keep prior top-level timeline row identities stable where TanStack DB exposes stable maintained rows. - Pending/optimistic inbox behavior still works in `ChatView`. +- `generationActive` is returned by `useEntityTimeline` without requiring + timeline consumers to scan every row. ## Deferred decisions and audit notes - Do not add a convenience `text` string to text rows in the first pass. Leave text materialization to consumers until we have reviewed how the streaming Markdown parser should consume chunks optimally. -- Verify whether live query rows expose an Electric offset suitable for ordering. - It must be the last offset that affected the row to replace `_seq` safely. +- Verify whether live query rows expose an Electric first/insert offset suitable + for ordering. A last-updated offset is not suitable for timeline order. - Leave `normalizeEntityTimelineData` and related aggregate helpers in place for now. After the row-oriented timeline migration is complete, audit which aggregate helpers are still used and remove unused compatibility code then. ## Implementation phases -1. Define row types and source subquery helpers in `agents-runtime`. -2. Implement `createEntityTimelineQuery` with multi-source `from` and no outer +1. Define row types, hook return types, and source subquery helpers in + `agents-runtime`. +2. Decide and implement the stable event-order source. +3. Implement `createEntityTimelineQuery` with multi-source `from` and no outer `select`. -3. Add run nested includes for ordered text/tool-call items, errors, and steps. -4. Add text chunk includes and remove eager text concatenation from the new API. -5. Add parity tests against key `createEntityIncludesQuery` behaviors. -6. Migrate `agents-server-ui` behind the same public `useEntityTimeline` hook. -7. Migrate observe/example streaming UIs. -8. Document when to use `createEntityIncludesQuery` versus - `createEntityTimelineQuery`. +4. Add `useEntityTimeline` returning rows plus derived state + (`generationActive`, pending inbox, etc.). +5. Add run nested includes for ordered text/tool-call items, errors, and nested + step metadata. +6. Add text chunk includes and remove eager text concatenation from the new API. +7. Add parity tests against key `createEntityIncludesQuery` behaviors. +8. Migrate `agents-server-ui` behind the same public `useEntityTimeline` hook. +9. Migrate observe/example streaming UIs. +10. Document when to use `createEntityIncludesQuery` versus + `createEntityTimelineQuery`. From a057acca0fe36fbdb55738fb069f196e8ad70c92 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Sun, 17 May 2026 16:55:36 +0100 Subject: [PATCH 03/10] plan: clarify entity timeline optimistic ordering Co-authored-by: Cursor --- ENTITY_TIMELINE_QUERY_PLAN.md | 187 +++++++++++++++++++++++----------- 1 file changed, 130 insertions(+), 57 deletions(-) diff --git a/ENTITY_TIMELINE_QUERY_PLAN.md b/ENTITY_TIMELINE_QUERY_PLAN.md index 24b747314e..e9153ddb5d 100644 --- a/ENTITY_TIMELINE_QUERY_PLAN.md +++ b/ENTITY_TIMELINE_QUERY_PLAN.md @@ -80,15 +80,16 @@ callers can choose how inbox messages participate in the timeline: ```ts type EntityTimelineInboxMode = - | `processed` // default: processed messages inline, pending returned separately + | `processed` // default: processed + optimistic local pending rows inline | `none` // no inbox messages in the timeline | `all` // include processed, pending, paused, steer, and cancelled inbox rows ``` Default to `processed`. This matches the built-in chat UX: processed user -messages appear chronologically in the timeline, while pending/queued messages -remain separately editable in the composer/drawer and can be projected -optimistically at the bottom of the UI. +messages appear chronologically in the timeline, and optimistic local pending +rows created through TanStack DB mutation APIs also appear in the same timeline +path while they are unsynced. Non-optimistic queued rows can still remain +separately editable in the composer/drawer. Use `none` for UIs that want a pure event/run stream without user messages. Use `all` for debugging/audit UIs where pending/cancelled/steer rows should be @@ -98,6 +99,21 @@ visible inline exactly where they happened. when the UI needs editable queued messages, regardless of the main timeline inbox mode. +To avoid duplicate rendering, the hook should separate timeline rendering from +editable pending state: + +- `rows` follows `inboxMode`. +- `pendingInbox` is returned for composer/drawer editing state. +- In the default `processed` mode, `rows` includes synced processed inbox rows + plus local optimistic pending inbox rows created through DB mutation APIs. +- When `inboxMode: 'all'`, all pending inbox messages may appear in `rows`; + callers that also render `pendingInbox` controls must key by `row.inbox.$key` + and avoid drawing a second message bubble for the same pending row. +- Optimistic pending inbox inserts should go into the inbox collection with a + pending `_timeline_order`, so they appear through the same `rows` path. The + separate `pendingInbox` result is metadata/editing state for the same row, not + a second rendering source. + ## Why this exists The current UI path is: @@ -160,7 +176,7 @@ Each source subquery should project rows into a runtime-level timeline event shape with a common base: ```ts -type EntityTimelineOrder = string | number +type EntityTimelineOrder = string type EntityTimelineRowBase = { order: EntityTimelineOrder @@ -222,9 +238,18 @@ Filter to processed messages by default. The exact filter depends on ```ts const inbox = q .from({ inbox: db.collections.inbox }) - .where(({ inbox }) => eq(coalesce(inbox.status, `processed`), `processed`)) + .where(({ inbox }) => + opts.inboxMode === `processed` + ? or( + eq(coalesce(inbox.status, `processed`), `processed`), + isOptimisticLocalRow(inbox) + ) + : opts.inboxMode === `all` + ? true + : false + ) .select(({ inbox }) => ({ - order: coalesce(inbox._seq, -1), + order: inbox._timeline_order, from: coalesce(inbox.from, `unknown`), payload: inbox.payload, timestamp: coalesce(inbox.timestamp, EPOCH_ISO), @@ -245,7 +270,7 @@ The run source is the most important for fine-grained streaming behavior: ```ts const run = q.from({ run: db.collections.runs }).select(({ run }) => ({ - order: coalesce(run._seq, -1), + order: run._timeline_order, status: run.status, finish_reason: run.finish_reason, items: runItemsInclude(run.key), @@ -362,7 +387,7 @@ Project the current wake display payload: ```ts const wake = q.from({ wake: db.collections.wakes }).select(({ wake }) => ({ - order: coalesce(wake._seq, -1), + order: wake._timeline_order, payload: { type: `wake` as const, timestamp: wake.timestamp, @@ -384,7 +409,7 @@ timeline. The new query should include them directly: const manifest = q .from({ manifest: db.collections.manifests }) .select(({ manifest }) => ({ - order: coalesce(manifest._seq, -1), + order: manifest._timeline_order, manifest, })) ``` @@ -406,7 +431,7 @@ timeline UIs, even if the built-in chat UI chooses not to display them yet: const contextInserted = q .from({ contextInserted: db.collections.contextInserted }) .select(({ contextInserted }) => ({ - order: coalesce(contextInserted._seq, -1), + order: contextInserted._timeline_order, id: contextInserted.id, name: contextInserted.name, attrs: contextInserted.attrs, @@ -443,45 +468,70 @@ it through state/status APIs instead. ## Ordering -Prefer an explicit stable event-order value written during stream writes. The -timeline needs "when this event first happened", not "when this row was last -updated". Run rows, step rows, text rows, and tool-call rows are often updated -after they are inserted; using the last update offset would move old timeline -items forward incorrectly. - -Best target design: - -- add or reuse a cross-collection `timeline_order`/event-order value on built-in - event rows; -- assign it when the event row is first written; -- preserve it on later updates; -- use it as the primary order for top-level rows and nested run items. - -If Electric exposes the insert offset or stable first offset for each row in live -queries, that can replace an explicit event-order field. If the available offset -is the last offset that affected the row, do not use it for timeline ordering. - -`_seq` can remain a temporary compatibility source while the new ordering field -or first-offset support is implemented, but the plan should not treat `_seq` as -the final design. - -For run rows, preserve the existing behavior where a run is anchored to the -earliest child event rather than a later run-row update. This may require a -derived `runOrder` subquery/field that is the minimum of: - -- the run row order; -- text item order; -- first text delta order; -- tool call order; -- step order. - -If that is too much for the first pass, document the gap and keep tests around -the old aggregate query until the row-oriented query reaches parity. - -If we introduce a stable event-order field, this becomes simpler: the run's -display order should be the minimum event order across the run row, text rows, -text chunks, tool calls, and steps. That keeps a streaming run anchored near the -message that caused it even when the run row is completed later. +Use a stable first-event order injected by `createEntityStreamDB` at stream +ingest time. + +Do not use the existing `__electricRowOffsets` map for live timeline ordering. +That map is currently updated on every change event, so it represents the last +offset that affected a row. The timeline needs "when this event first happened", +not "when this row was last updated". Run rows, step rows, text rows, and +tool-call rows are often updated after they are inserted; using a last-update +offset would move old timeline items forward incorrectly. + +Decision: + +- Add an internal row field, `_timeline_order`, to the built-in event row + types/schemas that participate in timelines. +- In `createEntityStreamDB`'s `onBeforeBatch`, derive a stable order token for + each incoming change event from the Electric/Durable Streams offset and the + item index within the batch. +- Store the first order token seen for each `(collection, row key)` in a map. +- Before StreamDB applies each insert/update/upsert change, inject that first + order token into `item.value._timeline_order`. +- Preserve the original first order token on later updates. +- Use `_timeline_order` as the primary order expression in + `createEntityTimelineQuery`. + +`_timeline_order` should be a lexically sortable string. Use an +offset-plus-index token rather than raw offset alone. Some stream batches can +contain multiple events, and not every event is guaranteed to carry a unique +per-item offset. A token such as `${offset}:${itemIndex.padStart(...)}` gives a +stable total order while still preserving the underlying stream order. + +Because `_timeline_order` is an actual query field, it must be accepted by the +built-in row schemas/types. Do not rely on mutating event values if validation or +row parsing would strip the field before TanStack DB sees it. + +Reset/replay behavior: + +- On stream reset, clear the first-order map. +- During replay, rebuild `_timeline_order` from replayed events. +- If the first event observed for a row is an update rather than an insert, use + that first observed update's order. It is still stable for the local + materialization. + +Optimistic local rows: + +- Pending optimistic rows that do not yet have a stream offset should receive a + high pending order token so they sort after persisted rows. +- Pending order tokens should preserve local insertion order, e.g. + `pending:${counter.padStart(...)}`. +- Optimistic rows should render through the same timeline query/component path as + synced rows. +- Optimistic rows should be applied through TanStack DB mutation APIs so the + collection sees them as normal local rows, including their pending + `_timeline_order`. +- When the persisted stream event arrives, the server-backed row should replace + the optimistic row and use its real `_timeline_order` from the writeback. + +This keeps ordering local to StreamDB ingestion and avoids modifying every write +site (`EntityManager`, `outbound-bridge`, context writes, wake writes, etc.) to +precompute an order value they do not know yet. + +For run rows, use `run._timeline_order` directly. Because `_timeline_order` is +assigned from the first event observed for the run row and preserved on later +updates, completion updates no longer move the run later in the timeline. This +removes the need for the current imperative run/text re-anchoring workaround. ## Runtime exports @@ -565,18 +615,34 @@ Add runtime tests for the new query: includes. - Child UI components subscribe to child collections with `useLiveQuery`. - A tool call update updates the matching nested run item. -- Pending inbox rows are excluded from the main timeline by default. +- Non-optimistic pending inbox rows are excluded from the main timeline by + default. +- Optimistic pending rows created through DB mutation APIs are included in the + main timeline by default. - `opts.inboxMode` can exclude inbox rows or include all inbox statuses. - Manifests are present in the timeline without a separate merge. -- Run rows preserve current ordering semantics, including the "run anchored to - earliest child event" behavior. +- Run rows keep their first-event `_timeline_order` when completed, so completion + updates do not move them later in the timeline. +- `_timeline_order` is derived from the first stream offset/item index for a row + and does not change when that row is updated. +- Multiple events in the same stream batch still receive deterministic relative + ordering. +- Stream reset/replay clears and rebuilds `_timeline_order` deterministically. +- Rows first observed through an update still get a stable first-observed + `_timeline_order`. +- Optimistic rows sort after persisted rows until replaced by server-backed rows + with real `_timeline_order`. +- Optimistic rows preserve local insertion order and render through the same row + path as synced rows. Add UI-level tests or focused hook tests: - `useEntityTimeline` no longer rebuilds the manifest merge path. - Streaming text updates keep prior top-level timeline row identities stable where TanStack DB exposes stable maintained rows. -- Pending/optimistic inbox behavior still works in `ChatView`. +- Optimistic inbox behavior renders through the same `rows` path in `ChatView`. +- Pending inbox editing state still works in `ChatView` without double-rendering + the same optimistic row. - `generationActive` is returned by `useEntityTimeline` without requiring timeline consumers to scan every row. @@ -585,8 +651,10 @@ Add UI-level tests or focused hook tests: - Do not add a convenience `text` string to text rows in the first pass. Leave text materialization to consumers until we have reviewed how the streaming Markdown parser should consume chunks optimally. -- Verify whether live query rows expose an Electric first/insert offset suitable - for ordering. A last-updated offset is not suitable for timeline order. +- Validate the `_timeline_order` injection point against `@durable-streams/state` + internals. If `onBeforeBatch` cannot safely mutate incoming event values before + validation and collection application, add first-event-order support in + StreamDB itself rather than pushing ordering logic into every writer. - Leave `normalizeEntityTimelineData` and related aggregate helpers in place for now. After the row-oriented timeline migration is complete, audit which aggregate helpers are still used and remove unused compatibility code then. @@ -595,7 +663,12 @@ Add UI-level tests or focused hook tests: 1. Define row types, hook return types, and source subquery helpers in `agents-runtime`. -2. Decide and implement the stable event-order source. +2. Add `_timeline_order` row support: + - first prove the injection point by testing that values added in + `onBeforeBatch` survive validation and collection application; + - extend built-in event row schemas/types; + - inject first-offset-plus-index order tokens in `createEntityStreamDB`; + - preserve the first token across updates. 3. Implement `createEntityTimelineQuery` with multi-source `from` and no outer `select`. 4. Add `useEntityTimeline` returning rows plus derived state From ae6ac86bdbff5e149f846184a427e3ae79b5b0dd Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 18 May 2026 13:39:40 +0100 Subject: [PATCH 04/10] feat(agents): add reactive entity timeline query Use a fine-grained timeline query for the agents UI so streamed run items update through TanStack DB instead of rematerializing the whole chat timeline. Co-authored-by: Cursor --- packages/agents-runtime/src/client.ts | 10 + packages/agents-runtime/src/entity-schema.ts | 29 ++ .../agents-runtime/src/entity-stream-db.ts | 62 +++- .../agents-runtime/src/entity-timeline.ts | 290 +++++++++++++++++- packages/agents-runtime/src/index.ts | 10 + packages/agents-runtime/tsdown.config.ts | 1 + .../src/components/AgentResponse.tsx | 129 +++++++- .../src/components/EntityTimeline.tsx | 213 +++++++------ .../src/components/views/ChatView.tsx | 122 +------- .../src/hooks/useEntityTimeline.ts | 110 +++---- .../agents-server-ui/src/lib/sendMessage.ts | 5 + packages/agents-server-ui/src/main.tsx | 21 +- packages/agents-server-ui/vite.config.ts | 3 +- 13 files changed, 711 insertions(+), 294 deletions(-) diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index 49b797e2ba..0f96793f6c 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -2,8 +2,10 @@ export { createEntityStreamDB } from './entity-stream-db' export { createAgentsClient } from './agents-client' export { compareTimelineOrders, + createPendingTimelineOrder, createEntityErrorsQuery, createEntityIncludesQuery, + createEntityTimelineQuery, getEntityState, normalizeEntityTimelineData, normalizeTimelineEntities, @@ -30,8 +32,16 @@ export type { export type { EntityTimelineContentItem, EntityTimelineData, + EntityTimelineInboxMode, + EntityTimelineQueryOptions, + EntityTimelineQueryRow, + EntityTimelineRunRow, + EntityTimelineRunItem, EntityTimelineSection, EntityTimelineState, + EntityTimelineTextChunk, + EntityTimelineTextItem, + EntityTimelineToolCallItem, IncludesEntity, IncludesInboxMessage, IncludesRun, diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 3f7d62cfb6..7a18eb6c20 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -43,6 +43,8 @@ type SequencedPersistedRow = Omit< > & { key: string _seq?: number + _timeline_order?: string + _optimistic?: boolean } type Schema = z.ZodType type ChildEntityStatusValue = `spawning` | `running` | `idle` | `stopped` @@ -269,6 +271,11 @@ function createJsonObjectSchema(): Schema> { > } +const timelineOrderField = { + _timeline_order: z.string().optional(), + _optimistic: z.boolean().optional(), +} + function createChildEntityStatusSchema(): Schema { return z.enum([`spawning`, `running`, `idle`, `stopped`]) } @@ -319,6 +326,7 @@ function createWakeConfigSchema(): Schema { function createRunSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, status: z.enum([`started`, `completed`, `failed`]), finish_reason: z.string().optional(), }) @@ -327,6 +335,7 @@ function createRunSchema(): Schema { function createStepSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, run_id: z.string().optional(), step_number: z.number().int(), status: z.enum([`started`, `completed`]), @@ -340,6 +349,7 @@ function createStepSchema(): Schema { function createTextSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, run_id: z.string().optional(), status: z.enum([`streaming`, `completed`]), }) @@ -348,6 +358,7 @@ function createTextSchema(): Schema { function createTextDeltaSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, text_id: z.string(), run_id: z.string(), delta: z.string(), @@ -357,6 +368,7 @@ function createTextDeltaSchema(): Schema { function createToolCallSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, run_id: z.string().optional(), tool_call_id: z.string().optional(), tool_name: z.string(), @@ -377,6 +389,7 @@ function createToolCallSchema(): Schema { function createReasoningSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, status: z.enum([`streaming`, `completed`]), }) } @@ -384,6 +397,7 @@ function createReasoningSchema(): Schema { function createErrorEventSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, error_code: z.string(), message: z.string(), run_id: z.string().optional(), @@ -395,6 +409,7 @@ function createErrorEventSchema(): Schema { function createMessageReceivedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, from: z.string().optional(), payload: z.unknown().optional(), timestamp: z.string().optional(), @@ -410,6 +425,7 @@ function createMessageReceivedSchema(): Schema { function createWakeSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, timestamp: z.string(), source: z.string(), timeout: z.boolean(), @@ -422,6 +438,7 @@ function createWakeSchema(): Schema { function createEntityCreatedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, entity_type: z.string(), timestamp: z.string(), args: createJsonObjectSchema(), @@ -432,6 +449,7 @@ function createEntityCreatedSchema(): Schema { function createEntityStoppedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, timestamp: z.string(), reason: z.string().optional(), }) @@ -440,6 +458,7 @@ function createEntityStoppedSchema(): Schema { function createChildStatusSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, entity_url: z.string(), entity_type: z.string(), status: createChildEntityStatusSchema(), @@ -449,12 +468,14 @@ function createChildStatusSchema(): Schema { function createTagEntrySchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, value: z.string(), }) } function createContextInsertedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, id: z.string(), name: z.string(), attrs: z.record(z.string(), z.union([z.string(), z.number(), z.boolean()])), @@ -466,6 +487,7 @@ function createContextInsertedSchema(): Schema { function createContextRemovedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, id: z.string(), name: z.string(), timestamp: z.string(), @@ -483,6 +505,7 @@ function createManifestSchema(): Schema< return z.union([ z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`child`), id: z.string(), entity_type: z.string(), @@ -492,6 +515,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`source`), sourceType: z.string(), sourceRef: z.string(), @@ -500,6 +524,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`shared-state`), id: z.string(), mode: z.enum([`create`, `connect`]), @@ -514,6 +539,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`effect`), id: z.string(), function_ref: z.string(), @@ -521,6 +547,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`context`), id: z.string(), name: z.string(), @@ -533,6 +560,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`schedule`), id: z.string(), scheduleType: z.literal(`cron`), @@ -543,6 +571,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`schedule`), id: z.string(), scheduleType: z.literal(`future_send`), diff --git a/packages/agents-runtime/src/entity-stream-db.ts b/packages/agents-runtime/src/entity-stream-db.ts index 7e47642180..0eb681de8d 100644 --- a/packages/agents-runtime/src/entity-stream-db.ts +++ b/packages/agents-runtime/src/entity-stream-db.ts @@ -46,6 +46,7 @@ type EntityCollectionMeta = { __electricSourceDb?: EntityStreamDBWithActions __electricSourceId?: string __electricRowOffsets?: Map + __electricTimelineOrders?: Map } type EntityCollections = { @@ -98,6 +99,14 @@ type EntityStreamDBOptions = { ) const WRITE_TXID_TIMEOUT_MS = 20_000 +const TIMELINE_OFFSET_WIDTH = 24 +const TIMELINE_BATCH_INDEX_WIDTH = 8 + +function formatStreamTimelineOrder(offset: string, index: number): string { + return `stream:${offset.padStart(TIMELINE_OFFSET_WIDTH, `0`)}:${index + .toString() + .padStart(TIMELINE_BATCH_INDEX_WIDTH, `0`)}` +} /** * Create a StreamDB connected to a Electric Agents entity stream. @@ -137,9 +146,14 @@ export function createEntityStreamDB( } const collectionNameByEventType = new Map() const rowOffsetsByCollection = new Map>() + const timelineOrdersByCollection = new Map< + string, + Map + >() for (const [name, def] of Object.entries(mergedCollections)) { collectionNameByEventType.set(def.type, name) rowOffsetsByCollection.set(name, new Map()) + timelineOrdersByCollection.set(name, new Map()) } // Build a reverse map from TanStack DB collection id to schema key const collIdToSchemaKey: Record = {} @@ -167,6 +181,8 @@ export function createEntityStreamDB( const cleanRow = (row: Record): Record => { const clone = { ...row } delete clone._seq + delete clone._timeline_order + delete clone._optimistic return clone } @@ -270,22 +286,36 @@ export function createEntityStreamDB( onBeforeBatch: (batch) => { opts?.onBeforeBatch?.(batch) replayBatchOffset.current = batch.offset - for (const item of batch.items) { + batch.items.forEach((item, itemIndex) => { if (isControlEvent(item)) { if (item.headers.control === `reset`) { for (const offsets of rowOffsetsByCollection.values()) { offsets.clear() } + for (const orders of timelineOrdersByCollection.values()) { + orders.clear() + } } - continue + return } - if (!isChangeEvent(item)) continue + if (!isChangeEvent(item)) return const collectionName = collectionNameByEventType.get(item.type) - if (!collectionName) continue - rowOffsetsByCollection - .get(collectionName) - ?.set(item.key, item.headers.offset ?? batch.offset) - } + if (!collectionName) return + const offset = item.headers.offset ?? batch.offset + rowOffsetsByCollection.get(collectionName)?.set(item.key, offset) + + if (item.headers.operation === `delete`) return + if (typeof item.value !== `object` || item.value === null) return + + const orders = timelineOrdersByCollection.get(collectionName) + if (!orders) return + let order = orders.get(item.key) + if (!order) { + order = formatStreamTimelineOrder(offset, itemIndex) + orders.set(item.key, order) + } + ;(item.value as Record)._timeline_order = order + }) }, onBatch: (batch) => { opts?.onBatch?.(batch) @@ -634,6 +664,20 @@ export function createEntityStreamDB( ) } const primaryKey = mergedCollections[collectionName]!.primaryKey + if ( + event.headers.operation !== `delete` && + typeof event.value === `object` && + event.value !== null + ) { + const orders = timelineOrdersByCollection.get(collectionName) + const offset = + event.headers.offset ?? + `local:${Date.now().toString().padStart(13, `0`)}` + const order = + orders?.get(event.key) ?? formatStreamTimelineOrder(offset, 0) + orders?.set(event.key, order) + ;(event.value as Record)._timeline_order = order + } const transaction = createWriteTransaction({ debugOrigin: `apply-event:${event.type}:${event.headers.operation}`, }) @@ -699,6 +743,8 @@ export function createEntityStreamDB( replayDb.collections )) { collection.__electricRowOffsets = rowOffsetsByCollection.get(collectionName) + collection.__electricTimelineOrders = + timelineOrdersByCollection.get(collectionName) } return db as EntityStreamDBWithActions diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 9d4d208b78..e71b05b196 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -9,11 +9,20 @@ import { or, toArray, } from '@durable-streams/state' -import type { InitialQueryBuilder, QueryBuilder } from '@tanstack/db' +import * as TanStackDB from '@tanstack/db' +import type { + Collection, + InitialQueryBuilder, + QueryBuilder, +} from '@tanstack/db' import type { EntityStreamDB } from './entity-stream-db' import type { ChildStatusEntry, MessageReceived } from './entity-schema' import type { ManifestEntry, Wake, WakeMessage } from './types' +const { caseWhen } = TanStackDB as typeof TanStackDB & { + caseWhen: (condition: unknown, value: T) => T +} + export type EntityTimelineState = | `pending` | `queued` @@ -160,6 +169,112 @@ export interface EntityTimelineData { entities: Array } +export type EntityTimelineInboxMode = `processed` | `all` + +export interface EntityTimelineQueryOptions { + inboxMode?: EntityTimelineInboxMode +} + +export interface EntityTimelineTextChunk { + key: string + text_id: string + run_id: string + order: TimelineOrder + delta: string +} + +export interface EntityTimelineTextItem { + key: string + run_id?: string + order: TimelineOrder + status: `streaming` | `completed` + content: string +} + +export interface EntityTimelineToolCallItem { + key: string + run_id?: string + order: TimelineOrder + tool_call_id?: string + tool_name: string + status: `started` | `args_complete` | `executing` | `completed` | `failed` + args?: unknown + result?: unknown + error?: string +} + +export type EntityTimelineRunItem = + | { + $key: string + text: EntityTimelineTextItem + toolCall?: undefined + } + | { + $key: string + text?: undefined + toolCall: EntityTimelineToolCallItem + } + +export interface EntityTimelineStepItem { + key: string + run_id?: string + order: TimelineOrder + step_number: number + status: `started` | `completed` + model_id?: string + duration_ms?: number +} + +export interface EntityTimelineErrorItem { + key: string + run_id?: string + error_code: string + message: string +} + +export interface EntityTimelineRunRow { + key: string + order: TimelineOrder + status: `started` | `completed` | `failed` + finish_reason?: string + items: Collection + steps: Collection + errors: Collection +} + +export type EntityTimelineInboxRow = IncludesInboxMessage +export type EntityTimelineWakeRow = IncludesWakeMessage + +export type EntityTimelineQueryRow = + | { + $key: string + inbox: EntityTimelineInboxRow + run?: undefined + wake?: undefined + manifest?: undefined + } + | { + $key: string + inbox?: undefined + run: EntityTimelineRunRow + wake?: undefined + manifest?: undefined + } + | { + $key: string + inbox?: undefined + run?: undefined + wake: EntityTimelineWakeRow + manifest?: undefined + } + | { + $key: string + inbox?: undefined + run?: undefined + wake?: undefined + manifest: ManifestEntry + } + function normalizeTimelineRun(run: IncludesRun): IncludesRun { const texts = run.texts .map((text) => { @@ -319,6 +434,15 @@ function readInlineSeq(row: object): number | undefined { return typeof seq === `number` ? seq : undefined } +function readTimelineOrder(row: object): string | undefined { + const order = Reflect.get(row, `_timeline_order`) + return typeof order === `string` ? order : undefined +} + +export function createPendingTimelineOrder(index: number): string { + return `~pending:${index.toString().padStart(12, `0`)}` +} + function toSeqOrderToken(seq: number): string { return `seq:${seq.toString().padStart(12, `0`)}` } @@ -360,6 +484,11 @@ function readRequiredOrderToken( row: TRow, index: number ): string { + const timelineOrder = readTimelineOrder(row) + if (timelineOrder) { + return timelineOrder + } + const offset = collection.__electricRowOffsets?.get(row.key) if (offset) { return `offset:${offset}` @@ -380,6 +509,11 @@ function readOptionalOrderToken( }, row: TRow ): string | undefined { + const timelineOrder = readTimelineOrder(row) + if (timelineOrder) { + return timelineOrder + } + const offset = collection.__electricRowOffsets?.get(row.key) if (offset) { return `offset:${offset}` @@ -991,6 +1125,160 @@ const getEntityWakesCollection = cachedCollectionFactory((db: EntityStreamDB) => }) ) +type EntityTimelineQueryBuilder = (q: InitialQueryBuilder) => QueryBuilder + +export function createEntityTimelineQuery( + db: EntityStreamDB, + opts: EntityTimelineQueryOptions = {} +): EntityTimelineQueryBuilder { + return (q: InitialQueryBuilder) => buildEntityTimelineQuery(q, db, opts) +} + +function buildEntityTimelineQuery( + q: InitialQueryBuilder, + db: EntityStreamDB, + opts: EntityTimelineQueryOptions +): QueryBuilder { + const inboxMode = opts.inboxMode ?? `processed` + + let inbox = q.from({ inbox: db.collections.inbox }) + if (inboxMode === `processed`) { + inbox = inbox.where(({ inbox }) => + or( + eq(coalesce(inbox.status, `processed`), `processed`), + eq(coalesce(inbox._optimistic, false), true) + ) + ) + } + + const inboxSource = inbox.select(({ inbox }) => ({ + order: coalesce(inbox._timeline_order, `~`), + key: inbox.key, + from: coalesce(inbox.from, `unknown`), + payload: inbox.payload, + timestamp: coalesce(inbox.timestamp, ``), + mode: coalesce(inbox.mode, `immediate`), + status: coalesce(inbox.status, `processed`), + position: inbox.position, + processed_at: inbox.processed_at, + cancelled_at: inbox.cancelled_at, + })) + + const wakeSource = q + .from({ wake: db.collections.wakes }) + .select(({ wake }) => ({ + key: wake.key, + order: coalesce(wake._timeline_order, `~`), + payload: { + type: `wake` as const, + timestamp: wake.timestamp, + source: wake.source, + timeout: wake.timeout, + changes: wake.changes, + finished_child: wake.finished_child, + other_children: wake.other_children, + }, + })) + + const runItemsSource = q + .from({ + text: db.collections.texts, + toolCall: db.collections.toolCalls, + }) + .select(({ text, toolCall }) => ({ + order: coalesce(text._timeline_order, toolCall._timeline_order, `~`), + run_id: coalesce(text.run_id, toolCall.run_id, ``), + text: caseWhen(text.key, { + key: text.key, + run_id: text.run_id, + order: coalesce(text._timeline_order, `~`), + status: text.status, + }), + textContent: concat( + toArray( + q + .from({ chunk: db.collections.textDeltas }) + .where(({ chunk }) => eq(chunk.text_id, text.key)) + .orderBy(({ chunk }) => coalesce(chunk._timeline_order, `~`)) + .select(({ chunk }) => chunk.delta) + ) + ), + toolCall: caseWhen(toolCall.key, { + key: toolCall.key, + run_id: toolCall.run_id, + order: coalesce(toolCall._timeline_order, `~`), + tool_call_id: toolCall.tool_call_id, + tool_name: toolCall.tool_name, + status: toolCall.status, + args: toolCall.args, + result: toolCall.result, + error: toolCall.error, + }), + })) + + const runSource = q.from({ run: db.collections.runs }).select(({ run }) => ({ + key: run.key, + order: coalesce(run._timeline_order, `~`), + status: run.status, + finish_reason: run.finish_reason, + items: q + .from({ item: runItemsSource }) + .where(({ item }) => eq(item.run_id, run.key)) + .orderBy(({ item }) => item.order) + .select(({ item }) => ({ + text: caseWhen(item.text.key, { + key: item.text.key, + run_id: item.text.run_id, + order: item.text.order, + status: item.text.status, + content: item.textContent, + }), + toolCall: item.toolCall, + })), + steps: q + .from({ step: db.collections.steps }) + .where(({ step }) => eq(step.run_id, run.key)) + .orderBy(({ step }) => step.step_number) + .orderBy(({ step }) => coalesce(step._timeline_order, `~`)) + .select(({ step }) => ({ + key: step.key, + run_id: step.run_id, + order: coalesce(step._timeline_order, `~`), + step_number: step.step_number, + status: step.status, + model_id: step.model_id, + duration_ms: step.duration_ms, + })), + errors: q + .from({ error: db.collections.errors }) + .where(({ error }) => eq(error.run_id, run.key)) + .orderBy(({ error }) => error.key) + .select(({ error }) => ({ + key: error.key, + run_id: error.run_id, + error_code: error.error_code, + message: error.message, + })), + })) + + return q + .from({ + inbox: inboxSource, + run: runSource, + wake: wakeSource, + manifest: db.collections.manifests, + }) + .orderBy(({ inbox, run, wake, manifest }) => + coalesce( + inbox.order, + run.order, + wake.order, + manifest._timeline_order, + `~` + ) + ) +} + type EntityQueryBuilder = (q: InitialQueryBuilder) => QueryBuilder export function createEntityIncludesQuery( diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 14ed10070d..1626996968 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -115,7 +115,9 @@ export { export type { EntityMembershipRow, EntityTags, TagOperation } from './tags' export { createEntityIncludesQuery, + createEntityTimelineQuery, createEntityErrorsQuery, + createPendingTimelineOrder, getEntityState, normalizeEntityTimelineData, normalizeTimelineEntities, @@ -124,6 +126,14 @@ export { export type { EntityTimelineData, EntityTimelineContentItem, + EntityTimelineInboxMode, + EntityTimelineQueryOptions, + EntityTimelineQueryRow, + EntityTimelineRunRow, + EntityTimelineRunItem, + EntityTimelineTextChunk, + EntityTimelineTextItem, + EntityTimelineToolCallItem, IncludesEntity, EntityTimelineSection, EntityTimelineState, diff --git a/packages/agents-runtime/tsdown.config.ts b/packages/agents-runtime/tsdown.config.ts index 3b88224702..f106db7088 100644 --- a/packages/agents-runtime/tsdown.config.ts +++ b/packages/agents-runtime/tsdown.config.ts @@ -3,6 +3,7 @@ import type { Options } from 'tsdown' const config: Options = { entry: [`src/index.ts`, `src/react.ts`, `src/tools.ts`, `src/client.ts`], format: [`esm`, `cjs`], + external: [/^@tanstack\//, /^@durable-streams\//], dts: true, clean: true, } diff --git a/packages/agents-server-ui/src/components/AgentResponse.tsx b/packages/agents-server-ui/src/components/AgentResponse.tsx index 6624e1b016..dae207b0a1 100644 --- a/packages/agents-server-ui/src/components/AgentResponse.tsx +++ b/packages/agents-server-ui/src/components/AgentResponse.tsx @@ -7,6 +7,7 @@ import { useRef, useState, } from 'react' +import { useLiveQuery } from '@tanstack/react-db' import { Streamdown } from 'streamdown' import { getCachedMarkdownRender, @@ -27,6 +28,9 @@ import { ThinkingIndicator } from './ThinkingIndicator' import styles from './AgentResponse.module.css' import type { EntityTimelineContentItem, + EntityTimelineRunRow, + EntityTimelineTextItem, + EntityTimelineToolCallItem, EntityTimelineSection, } from '@electric-ax/agents-runtime/client' @@ -37,6 +41,16 @@ type AgentResponseSection = Extract< const SHIKI_SETTLE_MS = 80 +function compareTimelineOrderValues( + left: string | number, + right: string | number +): number { + if (typeof left === `number` && typeof right === `number`) { + return left - right + } + return String(left).localeCompare(String(right)) +} + const MarkdownSegment = memo(function MarkdownSegment({ text, contentHash, @@ -51,12 +65,12 @@ const MarkdownSegment = memo(function MarkdownSegment({ canCache: boolean }): React.ReactElement { const wrapperRef = useRef(null) + const cachedHtmlHashRef = useRef(null) // Tracks the content hash that the currently-displayed `cachedHtml` // belongs to, so we can distinguish "the underlying text changed and our // cached HTML is now stale" from "only the column width changed and our // cached HTML is still semantically correct, just laid out for a // different width". - const cachedHtmlHashRef = useRef(null) const [cachedHtml, setCachedHtmlState] = useState(() => { if (!canCache || !isMarkdownRenderCacheReady() || renderWidth <= 0) return null @@ -230,6 +244,119 @@ function toolItemToCopyText(item: EntityTimelineContentItem): string { return parts.join(`\n`) } +function liveToolCallToContentItem( + item: EntityTimelineToolCallItem +): Extract { + return { + kind: `tool_call`, + toolCallId: item.tool_call_id ?? item.key, + toolName: item.tool_name, + args: + item.args && typeof item.args === `object` + ? (item.args as Record) + : {}, + status: item.status, + result: typeof item.result === `string` ? item.result : undefined, + isError: Boolean(item.error), + } +} + +const LiveTextItem = memo(function LiveTextItem({ + item, + isStreaming, + renderWidth, +}: { + item: EntityTimelineTextItem + isStreaming: boolean + renderWidth: number +}): React.ReactElement { + return ( + + ) +}) + +export const AgentResponseLive = memo(function AgentResponseLive({ + run, + isStreaming, + timestamp, + renderWidth = 0, +}: { + run: EntityTimelineRunRow + isStreaming: boolean + timestamp?: number | null + renderWidth?: number +}): React.ReactElement { + const { data: items = [] } = useLiveQuery( + (q) => (run.items ? q.from({ item: run.items }) : undefined), + [run.items] + ) + const sortedItems = useMemo( + () => + [...items].sort((a, b) => + compareTimelineOrderValues( + a.text?.order ?? a.toolCall?.order ?? `~`, + b.text?.order ?? b.toolCall?.order ?? `~` + ) + ), + [items] + ) + const done = run.status !== `started` + const lastItem = sortedItems[sortedItems.length - 1] + const lastTextHasContent = lastItem?.text !== undefined + const showThinking = isStreaming && !done && !lastTextHasContent + const showTimestamp = timestamp != null && !isStreaming + const hasLeadingMeta = showThinking || done + + return ( + + {sortedItems.map((item, i) => { + if (item.text) { + return ( + + ) + } + + return ( + + ) + })} + + + {showThinking && } + {done && ( + + ✓ done + + )} + {showTimestamp && ( + <> + {hasLeadingMeta && ( + + · + + )} + + + )} + + + ) +}) + export const AgentResponse = memo(function AgentResponse({ section, isStreaming, diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index 1905b95984..f349c6a3fa 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -31,7 +31,7 @@ import { useElectricAgents } from '../lib/ElectricAgentsProvider' import { warmMarkdownRenderCache } from '../lib/markdownRenderCache' import { Icon, IconButton, ScrollArea, Stack, Text, Tooltip } from '../ui' import { UserMessage } from './UserMessage' -import { AgentResponse } from './AgentResponse' +import { AgentResponseLive } from './AgentResponse' import { InlineEventCard } from './InlineEventCard' import { InlineStatusBadge } from './InlineStatusBadge' import { @@ -44,12 +44,28 @@ import { } from '../lib/formatTime' import styles from './EntityTimeline.module.css' import type { + EntityTimelineSection, + EntityTimelineQueryRow, IncludesEntity, Manifest, } from '@electric-ax/agents-runtime/client' -import type { TimelineEntry } from '../lib/timelineEntries' import type { PaneFindAdapter, PaneFindMatch } from '../hooks/usePaneFind' +type RenderTimelineRow = EntityTimelineQueryRow +type WakeSection = Extract + +function renderRowKey(row: RenderTimelineRow): string { + return row.$key +} + +function readInboxText(payload: unknown): string { + if (payload && typeof payload === `object`) { + const text = (payload as { text?: unknown }).text + if (typeof text === `string`) return text + } + return typeof payload === `string` ? payload : `` +} + /** * Width-aware row-height estimate used as the initial size hint for the * virtualizer (before the real DOM has been measured). Producing an estimate @@ -65,7 +81,7 @@ import type { PaneFindAdapter, PaneFindMatch } from '../hooks/usePaneFind' * width and multiply by the body line height. */ function estimateRowHeight( - row: TimelineEntry | undefined, + row: RenderTimelineRow | undefined, contentWidth: number ): number { if (!row) return 120 @@ -76,26 +92,17 @@ function estimateRowHeight( const charsPerLine = Math.max(40, Math.floor(usableWidth / 7)) const lineHeight = 22 // 14px font * ~1.55 leading - if (row.section.kind === `user_message`) { - const lines = Math.max(1, Math.ceil(row.section.text.length / charsPerLine)) - // bubble padding (24) + meta row (~24) + content + if (row.inbox) { + const lines = Math.max( + 1, + Math.ceil(readInboxText(row.inbox.payload).length / charsPerLine) + ) return Math.max(64, 48 + lines * lineHeight) + timelineRowGap(row) } - if (row.section.kind === `wake`) { + if (row.wake || row.manifest) { return 76 + timelineRowGap(row) } - if (row.section.kind === `manifest`) { - return 76 + timelineRowGap(row) - } - - const textLength = row.section.items.reduce((total: number, item) => { - if (item.kind === `text`) return total + item.text.length - // Tool calls render as a compact block; assume ~3 lines. - return total + charsPerLine * 3 - }, 0) - const lines = Math.max(2, Math.ceil(textLength / charsPerLine)) - // status row (~24) + content + a little breathing room - return Math.max(120, 32 + lines * lineHeight) + timelineRowGap(row) + return 120 + timelineRowGap(row) } const BOTTOM_PIN_THRESHOLD = 8 @@ -104,10 +111,8 @@ const ROW_GAP = 24 const MANIFEST_ROW_GAP = 10 const ROW_SETTLE_MS = 500 -function timelineRowGap(row: TimelineEntry): number { - return row.section.kind === `manifest` || row.section.kind === `wake` - ? MANIFEST_ROW_GAP - : ROW_GAP +function timelineRowGap(row: RenderTimelineRow): number { + return row.manifest || row.wake ? MANIFEST_ROW_GAP : ROW_GAP } type TimelinePaneFindMatch = PaneFindMatch & { @@ -116,42 +121,27 @@ type TimelinePaneFindMatch = PaneFindMatch & { rowOccurrence: number } -function timelineRowSearchText(row: TimelineEntry): string { - const { section } = row - if (section.kind === `user_message`) return section.text - if (section.kind === `wake`) return wakeSectionText(section) - if (section.kind === `manifest`) return manifestSearchText(section.manifest) - - return section.items - .map((item) => { - if (item.kind === `text`) return item.text - const parts = [ - item.toolName, - JSON.stringify(item.args, null, 2), - item.result ?? ``, - ] - return parts.filter((part) => part.trim().length > 0).join(`\n`) +function timelineRowSearchText(row: RenderTimelineRow): string { + if (row.inbox) return readInboxText(row.inbox.payload) + if (row.wake) { + return wakeSectionText({ + kind: `wake`, + payload: row.wake.payload, + timestamp: Date.parse(row.wake.payload.timestamp), }) - .filter((part) => part.trim().length > 0) - .join(`\n\n`) + } + if (row.manifest) return manifestSearchText(row.manifest) + return `` } -function timelineRowLabel(row: TimelineEntry): string { - switch (row.section.kind) { - case `user_message`: - return `User message` - case `wake`: - return `Wake` - case `manifest`: - return `Manifest item` - case `agent_response`: - return `Agent response` - } +function timelineRowLabel(row: RenderTimelineRow): string { + if (row.inbox) return `User message` + if (row.wake) return `Wake` + if (row.manifest) return `Manifest item` + return `Agent response` } -function wakeReason( - section: Extract -): string { +function wakeReason(section: WakeSection): string { const { payload } = section if (payload.timeout) return `timeout` if (payload.finished_child) { @@ -166,9 +156,7 @@ function wakeReason( return payload.source } -function wakeSectionText( - section: Extract -): string { +function wakeSectionText(section: WakeSection): string { return [ `woke`, wakeReason(section), @@ -180,7 +168,7 @@ function wakeSectionText( function WakeTimelineRow({ section, }: { - section: Extract + section: WakeSection }): React.ReactElement { const reason = wakeReason(section) const details = wakeDetails(section) @@ -211,7 +199,7 @@ function WakeTimelineRow({ } function wakeDetails( - section: Extract + section: WakeSection ): Array<{ label: string; value: string }> { const { payload } = section const details = [ @@ -263,7 +251,7 @@ function wakeDetails( } function wakeChildOutput( - section: Extract + section: WakeSection ): { label: string; value: string } | null { const child = section.payload.finished_child if (!child) return null @@ -600,19 +588,8 @@ function entityUrlsFromKey(key: string): Array { return key.length === 0 ? [] : key.split(`\0`) } -// `section` and `responseTimestamp` are pulled out of the parent -// `EntityTimelineEntry` so React.memo's shallow compare can hit on -// the *section* identity. `buildTimelineEntries` returns a fresh -// `entries` array (and fresh entry objects) on every chunk during -// streaming, but the runtime caches finished agent sections in a -// WeakMap keyed by the underlying run row — so unchanged rows -// receive the identical `section` reference each render. With the -// previous `row` prop, that hit was masked by the always-new wrapper -// object; splitting the props lets memo skip every settled row and -// only re-render the streaming row + the row that just settled. const TimelineRow = memo(function TimelineRow({ - section, - responseTimestamp, + row, entityStopped, isStreaming, renderWidth, @@ -620,8 +597,7 @@ const TimelineRow = memo(function TimelineRow({ tileId, entityStatusByUrl, }: { - section: TimelineEntry[`section`] - responseTimestamp: TimelineEntry[`responseTimestamp`] + row: RenderTimelineRow entityStopped: boolean isStreaming: boolean renderWidth: number @@ -629,21 +605,42 @@ const TimelineRow = memo(function TimelineRow({ tileId: string | null entityStatusByUrl: Map }): React.ReactElement { - if (section.kind === `user_message`) { - return + if (row.inbox) { + const timestamp = Date.parse(row.inbox.timestamp) + return ( + + ) } - if (section.kind === `wake`) { - return + + if (row.wake) { + return ( + + ) } - if (section.kind === `manifest`) { + + if (row.manifest) { return ( @@ -651,17 +648,16 @@ const TimelineRow = memo(function TimelineRow({ } return ( - ) }) export function EntityTimeline({ - entries, + rows, loading, error, entityStopped, @@ -671,7 +667,7 @@ export function EntityTimeline({ entities = [], scrollToBottomSignal = 0, }: { - entries: Array + rows: Array loading: boolean error: string | null entityStopped: boolean @@ -681,7 +677,6 @@ export function EntityTimeline({ entities?: Array scrollToBottomSignal?: number }): React.ReactElement { - const rows = useMemo(() => entries, [entries]) const { entitiesCollection } = useElectricAgents() const referencedEntityUrlKey = useMemo( () => stableEntityUrlKey(entities.map((entity) => entity.url)), @@ -734,20 +729,20 @@ export function EntityTimeline({ const handledScrollSignalRef = useRef(scrollToBottomSignal) const textColumnWidth = Math.max(0, contentWidth - CHAT_SURFACE_GUTTER) - const firstMessage = rows.find( - ( - row - ): row is TimelineEntry & { - section: Extract - } => row.section.kind === `user_message` - ) - const spawnTime = firstMessage?.section.timestamp ?? null + const spawnTime = useMemo(() => { + for (const row of rows) { + if (!row.inbox) continue + const timestamp = Date.parse(row.inbox.timestamp) + return Number.isFinite(timestamp) ? timestamp : null + } + return null + }, [rows]) const lastStreamingAgentKey = useMemo(() => { for (let index = rows.length - 1; index >= 0; index--) { const row = rows[index] - if (row.section.kind === `agent_response`) { - return row.section.done ? null : row.key + if (row.run) { + return row.run.status === `started` ? row.$key : null } } return null @@ -822,9 +817,10 @@ export function EntityTimeline({ count: rows.length, getScrollElement: () => viewport, estimateSize: (index) => - cachedSizeMapRef.current.get(rows[index]?.key ?? ``) ?? - estimateRowHeight(rows[index], textColumnWidth), - getItemKey: (index) => rows[index]?.key ?? index, + cachedSizeMapRef.current.get( + rows[index] ? renderRowKey(rows[index]!) : `` + ) ?? estimateRowHeight(rows[index], textColumnWidth), + getItemKey: (index) => (rows[index] ? renderRowKey(rows[index]!) : index), gap: 0, overscan: 6, measureElement: measureRowElement, @@ -845,12 +841,13 @@ export function EntityTimeline({ if (!query.trim()) return matches rows.forEach((row, rowIndex) => { + const rowKey = renderRowKey(row) const text = timelineRowSearchText(row) const starts = getTextMatchStarts(text, query) starts.forEach((start, rowOccurrence) => { matches.push({ - id: `${row.key}:${rowOccurrence}`, - rowKey: row.key, + id: `${rowKey}:${rowOccurrence}`, + rowKey, rowIndex, rowOccurrence, label: timelineRowLabel(row), @@ -1152,6 +1149,7 @@ export function EntityTimeline({ > {rowVirtualizer.getVirtualItems().map((virtualRow) => { const row = rows[virtualRow.index] + const rowKey = renderRowKey(row) // Stable row key. The previous implementation appended // `:${contentWidth}` to force remount on every column-width @@ -1167,8 +1165,8 @@ export function EntityTimeline({ key={virtualRow.key} ref={rowVirtualizer.measureElement} data-index={virtualRow.index} - data-item-key={row.key} - data-pane-find-row-key={row.key} + data-item-key={rowKey} + data-pane-find-row-key={rowKey} className={styles.virtualRow} style={{ transform: `translateY(${virtualRow.start}px)`, @@ -1176,10 +1174,9 @@ export function EntityTimeline({ }} > - >(() => new Map()) - const inlineTimeoutsRef = useRef( - new Map>() - ) - const processedInboxKeys = useMemo( + const optimisticInlineInboxKeys = useMemo( () => new Set( - entries - .filter((entry) => entry.section.kind === `user_message`) - .map((entry) => entry.key.replace(/^inbox:/, ``)) + timelineRows + .filter((row) => row.inbox?.status === `pending`) + .map((row) => row.inbox!.key) ), - [entries] - ) - const pendingInboxByKey = useMemo( - () => new Map(pendingInbox.map((message) => [message.key, message])), - [pendingInbox] + [timelineRows] ) - const projectedPendingMessage = useMemo(() => { - for (const [key, message] of inlineQueuedMessages) { - if (processedInboxKeys.has(key)) continue - return pendingInboxByKey.get(key) ?? message - } - return null - }, [inlineQueuedMessages, pendingInboxByKey, processedInboxKeys]) const visiblePendingInbox = useMemo( () => - projectedPendingMessage - ? pendingInbox.filter( - (message) => message.key !== projectedPendingMessage.key - ) - : pendingInbox, - [pendingInbox, projectedPendingMessage] - ) - const visibleEntries = useMemo>(() => { - if (!projectedPendingMessage) return entries - const timestamp = Date.parse(projectedPendingMessage.timestamp) - const hasUserMessage = entries.some( - (entry) => entry.section.kind === `user_message` - ) - return [ - ...entries, - { - key: `pending-inbox:${projectedPendingMessage.key}`, - order: Number.MAX_SAFE_INTEGER, - responseTimestamp: null, - section: { - kind: `user_message`, - from: projectedPendingMessage.from ?? `user`, - text: readTextPayload(projectedPendingMessage.payload), - timestamp: Number.isFinite(timestamp) ? timestamp : Date.now(), - isInitial: !hasUserMessage, - }, - }, - ] - }, [entries, projectedPendingMessage]) - - const rememberInlineQueuedMessage = useCallback( - (message: OptimisticInboxMessage) => { - setInlineQueuedMessages((current) => { - const next = new Map(current) - next.set(message.key, message) - return next - }) - const existingTimeout = inlineTimeoutsRef.current.get(message.key) - if (existingTimeout) clearTimeout(existingTimeout) - const timeout = setTimeout(() => { - inlineTimeoutsRef.current.delete(message.key) - setInlineQueuedMessages((current) => { - if (!current.has(message.key)) return current - const next = new Map(current) - next.delete(message.key) - return next - }) - }, INLINE_QUEUED_TIMEOUT_MS) - inlineTimeoutsRef.current.set(message.key, timeout) - }, - [] - ) - - useEffect(() => { - setInlineQueuedMessages((current) => { - let next: Map | null = null - for (const key of current.keys()) { - if (!processedInboxKeys.has(key)) continue - next ??= new Map(current) - next.delete(key) - const timeout = inlineTimeoutsRef.current.get(key) - if (timeout) clearTimeout(timeout) - inlineTimeoutsRef.current.delete(key) - } - return next ?? current - }) - }, [processedInboxKeys]) - - useEffect( - () => () => { - for (const timeout of inlineTimeoutsRef.current.values()) { - clearTimeout(timeout) - } - inlineTimeoutsRef.current.clear() - }, - [] + pendingInbox.filter( + (message) => !optimisticInlineInboxKeys.has(message.key) + ), + [optimisticInlineInboxKeys, pendingInbox] ) // If the timeline subscription errors out for an entity that isn't @@ -189,7 +95,7 @@ function GenericChatBody({ return ( <> ( - pendingInbox: EntityTimelineData[`inbox`] + timelineRows: Array + pendingInbox: Array entities: Array generationActive: boolean db: EntityStreamDBWithActions | null @@ -76,7 +74,10 @@ export function useEntityTimeline( }, [baseUrl, entityUrl]) const { data: timelineRows = [] } = useLiveQuery( - (q) => (db ? createEntityIncludesQuery(db)(q) : undefined), + (q) => { + if (!db) return undefined + return createEntityTimelineQuery(db)(q) + }, [db] ) const { data: manifests = [] } = useLiveQuery( @@ -98,58 +99,7 @@ export function useEntityTimeline( : undefined, [db] ) - const timelineData = useMemo( - () => - normalizeEntityTimelineData( - (timelineRows as Array)[0] ?? { - runs: [], - inbox: [], - wakes: [], - contextInserted: [], - contextRemoved: [], - entities: [], - } - ), - [timelineRows] - ) - - const entries = useMemo(() => { - const baseEntries = buildTimelineEntries( - timelineData.runs, - timelineData.inbox, - timelineData.wakes - ) - const orderByKey = new Map() - for (const run of timelineData.runs) { - orderByKey.set(`run:${run.key}`, run.order) - } - for (const msg of timelineData.inbox) { - orderByKey.set(`inbox:${msg.key}`, msg.order) - } - for (const wake of timelineData.wakes) { - orderByKey.set(`wake:${wake.key}`, wake.order) - } - - const merged: Array<{ order: string | number; entry: TimelineEntry }> = [ - ...baseEntries.map((entry) => ({ - order: orderByKey.get(entry.key) ?? Number.MAX_SAFE_INTEGER, - entry, - })), - ...(manifests as Array).map((manifest) => ({ - order: manifest._seq ?? Number.MAX_SAFE_INTEGER, - entry: { - key: `manifest:${manifest.key}`, - order: manifest._seq ?? Number.MAX_SAFE_INTEGER, - responseTimestamp: null, - section: { kind: `manifest` as const, manifest }, - }, - })), - ] - - return merged - .sort((left, right) => compareTimelineOrders(left.order, right.order)) - .map(({ entry }) => entry) - }, [manifests, timelineData.runs, timelineData.inbox, timelineData.wakes]) + const typedTimelineRows = timelineRows as Array const pendingInbox = useMemo( () => @@ -157,7 +107,7 @@ export function useEntityTimeline( .map( (msg): IncludesInboxMessage => ({ key: msg.key, - order: msg._seq ?? Number.MAX_SAFE_INTEGER, + order: msg._timeline_order ?? msg._seq ?? Number.MAX_SAFE_INTEGER, from: msg.from, payload: msg.payload, timestamp: msg.timestamp, @@ -191,14 +141,44 @@ export function useEntityTimeline( [pendingInboxRows] ) const generationActive = useMemo( - () => timelineData.runs.some((run) => run.status === `started`), - [timelineData.runs] + () => typedTimelineRows.some((row) => row.run?.status === `started`), + [typedTimelineRows] + ) + const entities = useMemo( + () => + normalizeTimelineEntities( + (manifests as Array) + .filter( + (manifest) => + manifest.kind === `child` || manifest.kind === `source` + ) + .map( + (manifest): IncludesEntity => ({ + key: + manifest.kind === `child` + ? manifest.entity_url + : manifest.sourceRef, + kind: manifest.kind, + id: manifest.kind === `child` ? manifest.id : manifest.sourceRef, + url: + manifest.kind === `child` + ? manifest.entity_url + : manifest.sourceRef, + type: + manifest.kind === `child` ? manifest.entity_type : undefined, + observed: + manifest.kind === `source` || Boolean(manifest.observed), + wake: manifest.wake, + }) + ) + ), + [manifests] ) return { - entries, + timelineRows: typedTimelineRows, pendingInbox, - entities: timelineData.entities, + entities, generationActive, db, loading, diff --git a/packages/agents-server-ui/src/lib/sendMessage.ts b/packages/agents-server-ui/src/lib/sendMessage.ts index 018d7152fe..3f1384ecd5 100644 --- a/packages/agents-server-ui/src/lib/sendMessage.ts +++ b/packages/agents-server-ui/src/lib/sendMessage.ts @@ -1,5 +1,6 @@ import { createOptimisticAction } from '@tanstack/db' import { generateKeyBetween } from 'fractional-indexing' +import { createPendingTimelineOrder } from '@electric-ax/agents-runtime/client' import { getActivePrincipal, getConfiguredServerHeaders, @@ -19,6 +20,8 @@ let optimisticInboxSeq = OPTIMISTIC_INBOX_SEQ_START export type OptimisticInboxMessage = { key: string _seq: number + _timeline_order: string + _optimistic: true from: string payload: { text: string } timestamp: string @@ -195,6 +198,8 @@ export function createSendMessageAction({ const message: OptimisticInboxMessage = { key, _seq: seq, + _timeline_order: createPendingTimelineOrder(seq), + _optimistic: true, from, payload: { text }, timestamp: now, diff --git a/packages/agents-server-ui/src/main.tsx b/packages/agents-server-ui/src/main.tsx index dc494a5c75..a64d9001bc 100644 --- a/packages/agents-server-ui/src/main.tsx +++ b/packages/agents-server-ui/src/main.tsx @@ -28,12 +28,31 @@ import { App } from './App' // to non-ngrok hosts. Covers the durable-streams client's internal fetches // too, since it calls through the global fetch. const originalFetch = window.fetch.bind(window) +const shouldSkipNgrokWarning = (input: RequestInfo | URL): boolean => { + const url = + input instanceof Request + ? input.url + : input instanceof URL + ? input.href + : input + try { + return new URL(url, window.location.href).hostname.endsWith( + `.ngrok-free.app` + ) + } catch { + return false + } +} + window.fetch = ( input: RequestInfo | URL, init?: RequestInit ): Promise => { const headers = new Headers(init?.headers ?? {}) - if (!headers.has(`ngrok-skip-browser-warning`)) { + if ( + shouldSkipNgrokWarning(input) && + !headers.has(`ngrok-skip-browser-warning`) + ) { headers.set(`ngrok-skip-browser-warning`, `true`) } return originalFetch(input, { ...init, headers }) diff --git a/packages/agents-server-ui/vite.config.ts b/packages/agents-server-ui/vite.config.ts index f4635c0248..0287834746 100644 --- a/packages/agents-server-ui/vite.config.ts +++ b/packages/agents-server-ui/vite.config.ts @@ -51,9 +51,10 @@ export default defineConfig(({ command, mode }) => { `jsx-dev-runtime.js` ), }, - dedupe: [`react`, `react-dom`], + dedupe: [`react`, `react-dom`, `@tanstack/db`], }, optimizeDeps: { + exclude: [`@durable-streams/state`, `@tanstack/db`, `@tanstack/react-db`], include: [ `react`, `react-dom`, From fc59d2c9ec452378a2a811f22fdbd1a19c7f48d8 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 18 May 2026 13:43:07 +0100 Subject: [PATCH 05/10] fix(agents): use synced state for optimistic timeline rows Rely on TanStack DB's virtual $synced prop instead of carrying a custom _optimistic field through inbox rows. Co-authored-by: Cursor --- packages/agents-runtime/src/entity-schema.ts | 2 -- packages/agents-runtime/src/entity-stream-db.ts | 1 - packages/agents-runtime/src/entity-timeline.ts | 2 +- packages/agents-server-ui/src/lib/sendMessage.ts | 2 -- 4 files changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 7a18eb6c20..4ce00d68f2 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -44,7 +44,6 @@ type SequencedPersistedRow = Omit< key: string _seq?: number _timeline_order?: string - _optimistic?: boolean } type Schema = z.ZodType type ChildEntityStatusValue = `spawning` | `running` | `idle` | `stopped` @@ -273,7 +272,6 @@ function createJsonObjectSchema(): Schema> { const timelineOrderField = { _timeline_order: z.string().optional(), - _optimistic: z.boolean().optional(), } function createChildEntityStatusSchema(): Schema { diff --git a/packages/agents-runtime/src/entity-stream-db.ts b/packages/agents-runtime/src/entity-stream-db.ts index 0eb681de8d..f737181cee 100644 --- a/packages/agents-runtime/src/entity-stream-db.ts +++ b/packages/agents-runtime/src/entity-stream-db.ts @@ -182,7 +182,6 @@ export function createEntityStreamDB( const clone = { ...row } delete clone._seq delete clone._timeline_order - delete clone._optimistic return clone } diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index e71b05b196..23bf5e9115 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -1146,7 +1146,7 @@ function buildEntityTimelineQuery( inbox = inbox.where(({ inbox }) => or( eq(coalesce(inbox.status, `processed`), `processed`), - eq(coalesce(inbox._optimistic, false), true) + eq(inbox.$synced, false) ) ) } diff --git a/packages/agents-server-ui/src/lib/sendMessage.ts b/packages/agents-server-ui/src/lib/sendMessage.ts index 3f1384ecd5..28c7caa9c9 100644 --- a/packages/agents-server-ui/src/lib/sendMessage.ts +++ b/packages/agents-server-ui/src/lib/sendMessage.ts @@ -21,7 +21,6 @@ export type OptimisticInboxMessage = { key: string _seq: number _timeline_order: string - _optimistic: true from: string payload: { text: string } timestamp: string @@ -199,7 +198,6 @@ export function createSendMessageAction({ key, _seq: seq, _timeline_order: createPendingTimelineOrder(seq), - _optimistic: true, from, payload: { text }, timestamp: now, From 7aeedcc026b573f202614555eb6982d3a7d662a4 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 18 May 2026 13:56:20 +0100 Subject: [PATCH 06/10] fix(agents-ui): avoid seq on optimistic inbox rows Use the timeline order token for pending local inbox rows so optimistic timeline ordering no longer depends on the legacy _seq field. Co-authored-by: Cursor --- .../src/hooks/useEntityTimeline.ts | 7 +++- .../agents-server-ui/src/lib/sendMessage.ts | 37 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts index 5aab599d2e..c0657b8d6a 100644 --- a/packages/agents-server-ui/src/hooks/useEntityTimeline.ts +++ b/packages/agents-server-ui/src/hooks/useEntityTimeline.ts @@ -5,7 +5,7 @@ import { createEntityTimelineQuery, normalizeTimelineEntities, } from '@electric-ax/agents-runtime/client' -import { eq } from '@tanstack/db' +import { coalesce, eq } from '@tanstack/db' import { connectEntityStream } from '../lib/entity-connection' import type { EntityStreamDBWithActions, @@ -95,7 +95,10 @@ export function useEntityTimeline( ? q .from({ inbox: db.collections.inbox }) .where(({ inbox }) => eq(inbox.status, `pending`)) - .orderBy(({ inbox }) => inbox._seq, `asc`) + .orderBy(({ inbox }) => coalesce(inbox._timeline_order, `~`), `asc`) + .orderBy(({ inbox }) => + coalesce(inbox._seq, Number.MAX_SAFE_INTEGER) + ) : undefined, [db] ) diff --git a/packages/agents-server-ui/src/lib/sendMessage.ts b/packages/agents-server-ui/src/lib/sendMessage.ts index 28c7caa9c9..3690c87cda 100644 --- a/packages/agents-server-ui/src/lib/sendMessage.ts +++ b/packages/agents-server-ui/src/lib/sendMessage.ts @@ -10,16 +10,14 @@ import { entityApiUrl } from './entity-api' import { loadCloudAuthState } from './server-connection' import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime/client' -// Timeline queries sort inbox messages by `_seq`. Pending local rows do not -// have a server sequence yet, so put them after streamed rows until the real -// event with the same key arrives. -const OPTIMISTIC_INBOX_SEQ_START = Number.MAX_SAFE_INTEGER - 1_000_000 +// Pending local rows do not have a server stream offset yet, so put them after +// streamed rows until the real event with the same key arrives. +const OPTIMISTIC_INBOX_ORDER_START = Number.MAX_SAFE_INTEGER - 1_000_000 -let optimisticInboxSeq = OPTIMISTIC_INBOX_SEQ_START +let optimisticInboxOrderIndex = OPTIMISTIC_INBOX_ORDER_START export type OptimisticInboxMessage = { key: string - _seq: number _timeline_order: string from: string payload: { text: string } @@ -34,7 +32,7 @@ type SendMessageInput = { text: string mode: `immediate` | `queued` | `paused` | `steer` key: string - seq: number + pendingOrderIndex: number position?: string } @@ -50,16 +48,16 @@ type InboxMessageKeyInput = { key: string } -function createOptimisticInboxKey(seq: number): string { - return `optimistic-${Date.now()}-${seq}` +function createOptimisticInboxKey(pendingOrderIndex: number): string { + return `optimistic-${Date.now()}-${pendingOrderIndex}` } -function nextOptimisticInboxSeq(): number { - optimisticInboxSeq += 1 - if (optimisticInboxSeq >= Number.MAX_SAFE_INTEGER) { - optimisticInboxSeq = OPTIMISTIC_INBOX_SEQ_START +function nextOptimisticInboxOrderIndex(): number { + optimisticInboxOrderIndex += 1 + if (optimisticInboxOrderIndex >= Number.MAX_SAFE_INTEGER) { + optimisticInboxOrderIndex = OPTIMISTIC_INBOX_ORDER_START } - return optimisticInboxSeq + return optimisticInboxOrderIndex } const QUEUE_POSITION_TIMESTAMP_WIDTH = 16 @@ -192,12 +190,11 @@ export function createSendMessageAction({ onOptimisticMessage?: (message: OptimisticInboxMessage) => void }) { const action = createOptimisticAction({ - onMutate: ({ text, mode, key, seq, position }) => { + onMutate: ({ text, mode, key, pendingOrderIndex, position }) => { const now = new Date().toISOString() const message: OptimisticInboxMessage = { key, - _seq: seq, - _timeline_order: createPendingTimelineOrder(seq), + _timeline_order: createPendingTimelineOrder(pendingOrderIndex), from, payload: { text }, timestamp: now, @@ -242,7 +239,7 @@ export function createSendMessageAction({ mode?: `immediate` | `queued` | `paused` | `steer` position?: string }) => { - const seq = nextOptimisticInboxSeq() + const pendingOrderIndex = nextOptimisticInboxOrderIndex() const effectivePosition = position ?? (mode === `queued` || mode === `paused` @@ -251,8 +248,8 @@ export function createSendMessageAction({ return action({ text, mode, - key: createOptimisticInboxKey(seq), - seq, + key: createOptimisticInboxKey(pendingOrderIndex), + pendingOrderIndex, position: effectivePosition, }) } From f5accdb3f292a71c013795983cc2739693cecd08 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 18 May 2026 14:02:24 +0100 Subject: [PATCH 07/10] fix(agents-ui): keep first queued message inline Bridge the first pending queued message into the timeline while there is no active run so it does not briefly appear in the pending drawer. Co-authored-by: Cursor --- .../src/components/views/ChatView.tsx | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/packages/agents-server-ui/src/components/views/ChatView.tsx b/packages/agents-server-ui/src/components/views/ChatView.tsx index 4c4eadc8fd..2b2af5dac4 100644 --- a/packages/agents-server-ui/src/components/views/ChatView.tsx +++ b/packages/agents-server-ui/src/components/views/ChatView.tsx @@ -5,6 +5,7 @@ import { EntityTimeline } from '../EntityTimeline' import { MessageInput } from '../MessageInput' import { EntityContextDrawer } from '../EntityContextDrawer' import type { ViewProps } from '../../lib/workspace/viewRegistry' +import type { EntityTimelineQueryRow } from '@electric-ax/agents-runtime/client' /** * The default view: chat / timeline + message composer. @@ -81,6 +82,24 @@ function GenericChatBody({ ), [optimisticInlineInboxKeys, pendingInbox] ) + const inlinePendingInbox = + !entityStopped && !generationActive ? visiblePendingInbox[0] : undefined + const timelineRowsWithInlinePending = useMemo>( + () => + inlinePendingInbox + ? [ + ...timelineRows, + { + $key: `inbox:${inlinePendingInbox.key}`, + inbox: inlinePendingInbox, + } as EntityTimelineQueryRow, + ] + : timelineRows, + [inlinePendingInbox, timelineRows] + ) + const drawerPendingInbox = inlinePendingInbox + ? visiblePendingInbox.slice(1) + : visiblePendingInbox // If the timeline subscription errors out for an entity that isn't // currently spawning (so the failure isn't transient), bounce back to @@ -95,7 +114,7 @@ function GenericChatBody({ return ( <> Date: Mon, 18 May 2026 14:05:30 +0100 Subject: [PATCH 08/10] docs(agents): describe entity timeline query shape Document the multi-source row structure and live child collections returned by createEntityTimelineQuery. Co-authored-by: Cursor --- packages/agents-runtime/src/entity-timeline.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 23bf5e9115..46fea8fbc1 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -1127,6 +1127,24 @@ const getEntityWakesCollection = cachedCollectionFactory((db: EntityStreamDB) => type EntityTimelineQueryBuilder = (q: InitialQueryBuilder) => QueryBuilder +/** + * Builds a live timeline query for an entity stream. + * + * The returned query is a multi-source timeline ordered by each row's + * `_timeline_order`. Each result row has TanStack DB's virtual `$key` plus one + * populated source property: + * + * - `{ inbox }` for user inbox messages. + * - `{ run }` for agent runs. + * - `{ wake }` for wake events. + * - `{ manifest }` for manifest entries. + * + * Run rows include live child collections rather than materialized arrays: + * `run.items`, `run.steps`, and `run.errors`. Pass those child collections to + * `useLiveQuery` (or another live-query consumer) in child renderers to receive + * fine-grained updates while text chunks stream in. Text run items expose their + * concatenated streamed content as `item.text.content`. + */ export function createEntityTimelineQuery( db: EntityStreamDB, opts: EntityTimelineQueryOptions = {} From b7d582094ca3d84a41d59bc94eecd4cccf414ed1 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 18 May 2026 14:12:01 +0100 Subject: [PATCH 09/10] fix(agents-ui): keep timeline pinned while streaming Pin the chat timeline on content resize while near the bottom and force a final bottom scroll when a streaming run completes. Co-authored-by: Cursor --- .../src/components/EntityTimeline.tsx | 59 +++++++++++++++++-- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index f349c6a3fa..9074ce62f5 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -727,6 +727,7 @@ export function EntityTimeline({ const settledKeysRef = useRef(new Set()) const settleCheckTimerRef = useRef | null>(null) const handledScrollSignalRef = useRef(scrollToBottomSignal) + const previousStreamingAgentKeyRef = useRef(null) const textColumnWidth = Math.max(0, contentWidth - CHAT_SURFACE_GUTTER) const spawnTime = useMemo(() => { @@ -881,6 +882,17 @@ export function EntityTimeline({ rowVirtualizer.shouldAdjustScrollPositionOnItemSizeChange = () => false }, [rowVirtualizer]) + const scrollToTimelineEnd = useCallback(() => { + if (!viewport || rows.length === 0) return + rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + + // The stopped/status footer sits outside the virtual list, so make sure the + // physical scroll container is also flush with its full content height. + requestAnimationFrame(() => { + viewport.scrollTop = viewport.scrollHeight + }) + }, [rowVirtualizer, rows.length, viewport]) + const scrollAreaRef = useCallback((node: HTMLDivElement | null) => { setViewport(node) }, []) @@ -1038,11 +1050,46 @@ export function EntityTimeline({ if (!isNearBottom.current) return const frame = requestAnimationFrame(() => { - rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + scrollToTimelineEnd() + }) + + return () => cancelAnimationFrame(frame) + }, [rows, scrollToTimelineEnd, viewport]) + + useLayoutEffect(() => { + if (!contentElement || !viewport) return + + let frame: ReturnType | null = null + const pinToBottom = () => { + if (!isNearBottom.current) return + if (frame !== null) cancelAnimationFrame(frame) + frame = requestAnimationFrame(() => { + frame = null + scrollToTimelineEnd() + }) + } + + const observer = new ResizeObserver(pinToBottom) + observer.observe(contentElement) + return () => { + observer.disconnect() + if (frame !== null) cancelAnimationFrame(frame) + } + }, [contentElement, scrollToTimelineEnd, viewport]) + + useLayoutEffect(() => { + const previousStreamingAgentKey = previousStreamingAgentKeyRef.current + previousStreamingAgentKeyRef.current = lastStreamingAgentKey + if (!previousStreamingAgentKey || lastStreamingAgentKey) return + + isNearBottom.current = true + setShowJumpToBottom(false) + const frame = requestAnimationFrame(() => { + scrollToTimelineEnd() }) return () => cancelAnimationFrame(frame) - }, [rowVirtualizer, rows, viewport]) + }, [lastStreamingAgentKey, scrollToTimelineEnd]) useLayoutEffect(() => { if (handledScrollSignalRef.current === scrollToBottomSignal) return @@ -1052,11 +1099,11 @@ export function EntityTimeline({ if (!viewport || rows.length === 0) return const frame = requestAnimationFrame(() => { - rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + scrollToTimelineEnd() }) return () => cancelAnimationFrame(frame) - }, [rowVirtualizer, rows.length, scrollToBottomSignal, viewport]) + }, [rows.length, scrollToBottomSignal, scrollToTimelineEnd, viewport]) useEffect( () => () => { @@ -1071,9 +1118,9 @@ export function EntityTimeline({ if (rows.length > 0) { isNearBottom.current = true setShowJumpToBottom(false) - rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + scrollToTimelineEnd() } - }, [rowVirtualizer, rows.length]) + }, [rows.length, scrollToTimelineEnd]) if (loading) { return ( From 065f4fce83ee81f10a5593f5025b2f55df841a37 Mon Sep 17 00:00:00 2001 From: Sam Willis Date: Mon, 18 May 2026 14:35:40 +0100 Subject: [PATCH 10/10] changeset: add reactive agent timeline release note Mark the agents runtime and server UI for a patch release because the timeline query now uses fine-grained TanStack DB reactivity. Co-authored-by: Cursor --- .changeset/reactive-agent-timeline.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/reactive-agent-timeline.md diff --git a/.changeset/reactive-agent-timeline.md b/.changeset/reactive-agent-timeline.md new file mode 100644 index 0000000000..f76770abe5 --- /dev/null +++ b/.changeset/reactive-agent-timeline.md @@ -0,0 +1,6 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server-ui': patch +--- + +Add a fine-grained reactive entity timeline query and migrate the agents UI to use it. Timeline rows are maintained by TanStack DB using multi-source queries and live child collections, so streamed agent responses update incrementally without rematerializing the whole chat timeline.