diff --git a/.changeset/reactive-agent-timeline.md b/.changeset/reactive-agent-timeline.md new file mode 100644 index 0000000000..f76770abe5 --- /dev/null +++ b/.changeset/reactive-agent-timeline.md @@ -0,0 +1,6 @@ +--- +'@electric-ax/agents-runtime': patch +'@electric-ax/agents-server-ui': patch +--- + +Add a fine-grained reactive entity timeline query and migrate the agents UI to use it. Timeline rows are maintained by TanStack DB using multi-source queries and live child collections, so streamed agent responses update incrementally without rematerializing the whole chat timeline. diff --git a/ENTITY_TIMELINE_QUERY_PLAN.md b/ENTITY_TIMELINE_QUERY_PLAN.md new file mode 100644 index 0000000000..e9153ddb5d --- /dev/null +++ b/ENTITY_TIMELINE_QUERY_PLAN.md @@ -0,0 +1,683 @@ +# Electric Agents `createEntityTimelineQuery` plan + +## Goal + +Add a row-oriented, fine-grained live query API for Electric Agents timelines: + +```ts +createEntityTimelineQuery(db) +``` + +This query should produce an ordered timeline collection of individual events +rather than one aggregate session object. It is intended for hot UI paths such +as the built-in app chat log, observe UI, and user-built timeline UIs. + +Keep `createEntityIncludesQuery` as the snapshot/convenience API. It is useful +when consumers want the whole session shape at once, but it rematerializes the +session-level data structure on every streaming chunk. + +Also add a public React hook for app and userland UIs: + +```ts +useEntityTimeline(db, opts?) +``` + +The hook should be exported for users to build their own timeline UIs. It should +wrap `createEntityTimelineQuery` and return the maintained timeline rows plus +useful derived state such as `generationActive`, pending inbox messages, and any +other small status values needed by the built-in UI. + +## Core direction + +Use TanStack DB's multi-source `from` for the outer timeline query and keep the +outer query simple: + +```ts +return q + .from({ + inbox, + run, + wake, + manifest, + contextInserted, + contextRemoved, + }) + .orderBy(({ inbox, run, wake, manifest, contextInserted, contextRemoved }) => + coalesce( + inbox.order, + run.order, + wake.order, + manifest.order, + contextInserted.order, + contextRemoved.order + ) + ) +``` + +Do not use `caseWhen` in the outer `select`. Without an outer `select`, the +query result keeps the discriminated/exclusive union produced by multi-source +`from`: + +```ts +type EntityTimelineQueryRow = + | { inbox: EntityTimelineInboxRow; run?: undefined; wake?: undefined; ... } + | { run: EntityTimelineRunRow; inbox?: undefined; wake?: undefined; ... } + | { wake: EntityTimelineWakeRow; inbox?: undefined; run?: undefined; ... } + | ... +``` + +Any event-type projection should happen in the source subquery for that event +type. The outer query should only combine and order already-shaped event rows. + +Use `caseWhen` only where it is materially useful, such as branch-dependent +includes/joins after a union source or conditional nested projection. It should +not be required for the top-level timeline union. + +## Query and hook options + +Add options to both `createEntityTimelineQuery` and `useEntityTimeline` so +callers can choose how inbox messages participate in the timeline: + +```ts +type EntityTimelineInboxMode = + | `processed` // default: processed + optimistic local pending rows inline + | `none` // no inbox messages in the timeline + | `all` // include processed, pending, paused, steer, and cancelled inbox rows +``` + +Default to `processed`. This matches the built-in chat UX: processed user +messages appear chronologically in the timeline, and optimistic local pending +rows created through TanStack DB mutation APIs also appear in the same timeline +path while they are unsynced. Non-optimistic queued rows can still remain +separately editable in the composer/drawer. + +Use `none` for UIs that want a pure event/run stream without user messages. +Use `all` for debugging/audit UIs where pending/cancelled/steer rows should be +visible inline exactly where they happened. + +`useEntityTimeline` should always be free to run a separate pending-inbox query +when the UI needs editable queued messages, regardless of the main timeline +inbox mode. + +To avoid duplicate rendering, the hook should separate timeline rendering from +editable pending state: + +- `rows` follows `inboxMode`. +- `pendingInbox` is returned for composer/drawer editing state. +- In the default `processed` mode, `rows` includes synced processed inbox rows + plus local optimistic pending inbox rows created through DB mutation APIs. +- When `inboxMode: 'all'`, all pending inbox messages may appear in `rows`; + callers that also render `pendingInbox` controls must key by `row.inbox.$key` + and avoid drawing a second message bubble for the same pending row. +- Optimistic pending inbox inserts should go into the inbox collection with a + pending `_timeline_order`, so they appear through the same `rows` path. The + separate `pendingInbox` result is metadata/editing state for the same row, not + a second rendering source. + +## Why this exists + +The current UI path is: + +```text +createEntityIncludesQuery + -> normalizeEntityTimelineData + -> buildTimelineEntries + -> EntityTimeline +``` + +`createEntityIncludesQuery` returns one aggregate row with nested arrays: + +- `runs` +- `inbox` +- `wakes` +- `entities` +- nested `texts`, `toolCalls`, `steps`, `errors` +- text strings built from `concat(toArray(textDeltas...))` + +During streaming, every new text delta changes the aggregate row. The render +pipeline has memoization to avoid repainting settled rows, but the session data +structure is still rebuilt on each chunk. The new API should let TanStack DB +maintain the timeline incrementally at row and nested-collection granularity. + +## UI use cases to support + +The built-in app UI currently needs: + +- A stable ordered row list for virtualized rendering and find/search. +- Rows for processed user messages, agent runs/responses, wakes, manifests, + context insertion/removal events, and possibly top-level errors/lifecycle + events later. +- Agent run rows with ordered nested content: + - text segments; + - tool calls; + - run errors; + - steps, even if the current UI does not render them prominently. +- Text segment rows with ordered text chunks/deltas, so streaming updates only + the active text's nested collection rather than the whole timeline. +- Manifest rows for the app timeline, currently queried separately and merged in + `useEntityTimeline`. +- Related entity data/status for manifest badges and "open entity" actions. +- Pending inbox messages separately for queued/editable messages in + `MessageInput` and `EntityContextDrawer`. +- A cheap way to derive whether generation is active. + +UI-only behavior should stay outside the query: + +- Markdown rendering and render cache management. +- Tool argument parsing for display. +- Tool result stringification for display/copy. +- The first-message `isInitial` flag. +- `responseTimestamp`, which depends on the previous user message. +- Optimistic inline queued message projection. + +## Proposed source subqueries + +Each source subquery should project rows into a runtime-level timeline event +shape with a common base: + +```ts +type EntityTimelineOrder = string + +type EntityTimelineRowBase = { + order: EntityTimelineOrder +} +``` + +Do not project synthetic timeline identifiers in each source subquery. TanStack +DB adds `$key` virtual props to live query rows, and multi-source `from` prefixes +the outer row key with the active source alias. A no-select multi-source row +therefore has enough identity and discrimination already: + +```ts +{ + $key: `wake:123`, + wake: { + $key: 123, + // projected wake fields... + }, +} +``` + +Consumers should use the outer row's `$key` as the stable timeline row key and +narrow by the active source alias (`row.wake`, `row.run`, `row.inbox`, etc.). +The source row's own `$key` remains available inside the active alias if callers +need the original collection key. + +This matches the TanStack DB branch implementation: union branches are wrapped +under their source alias, no-select union queries return the namespaced row, and +the union stream key is prefixed with the source alias. + +This key rule also applies to selected source subqueries and nested collections: +the selected row should retain its `$key` virtual prop. Do not add `sourceKey` +fields for run, inbox, text, tool call, or chunk rows unless a future API proves +the virtual key is insufficient. + +Do not add a separate `kind` field just to discriminate timeline rows. The +active source alias is the discriminant. In UI code, replace old +`section.kind` switches with small alias-narrowing helpers: + +```ts +function isInboxRow(row: EntityTimelineQueryRow): row is InboxTimelineRow { + return row.inbox !== undefined +} + +function isRunRow(row: EntityTimelineQueryRow): row is RunTimelineRow { + return row.run !== undefined +} +``` + +For exhaustive handling, use a helper that branches over every known alias and +assigns the remainder to `never`. This keeps type safety without duplicating the +source alias as another string field that can drift from the query shape. + +### Inbox source + +Filter to processed messages by default. The exact filter depends on +`opts.inboxMode`: + +```ts +const inbox = q + .from({ inbox: db.collections.inbox }) + .where(({ inbox }) => + opts.inboxMode === `processed` + ? or( + eq(coalesce(inbox.status, `processed`), `processed`), + isOptimisticLocalRow(inbox) + ) + : opts.inboxMode === `all` + ? true + : false + ) + .select(({ inbox }) => ({ + order: inbox._timeline_order, + from: coalesce(inbox.from, `unknown`), + payload: inbox.payload, + timestamp: coalesce(inbox.timestamp, EPOCH_ISO), + mode: coalesce(inbox.mode, `immediate`), + status: coalesce(inbox.status, `processed`), + position: inbox.position, + processed_at: inbox.processed_at, + cancelled_at: inbox.cancelled_at, + })) +``` + +Pending inbox should remain a separate query for the composer/drawer because it +has different sorting and editing behavior. + +### Run source + +The run source is the most important for fine-grained streaming behavior: + +```ts +const run = q.from({ run: db.collections.runs }).select(({ run }) => ({ + order: run._timeline_order, + status: run.status, + finish_reason: run.finish_reason, + items: runItemsInclude(run.key), + errors: runErrorsInclude(run.key), + steps: runStepsInclude(run.key), +})) +``` + +`items` should be an ordered nested collection over texts and tool calls. Use +the same no-outer-select multi-source pattern here too, so item rows are +discriminated by active alias and keyed by DB: + +```ts +q.from({ + text, + toolCall, +}) + .where(({ text, toolCall }) => + eq(coalesce(text.run_id, toolCall.run_id), run.key) + ) + .orderBy(({ text, toolCall }) => coalesce(text.order, toolCall.order)) +``` + +The result shape should be: + +```ts +type EntityTimelineRunItem = + | { $key: string; text: EntityTimelineTextItem; toolCall?: undefined } + | { $key: string; text?: undefined; toolCall: EntityTimelineToolCallItem } +``` + +### Nested includes rendering contract + +Do not wrap these nested includes in `toArray` in `createEntityTimelineQuery`. +The point of the new API is that `items`, `chunks`, `errors`, and `steps` remain +child live collections maintained by TanStack DB. + +Parent timeline rows should pass child collections down to row components: + +```tsx +function TimelineRunRow({ run }: { run: EntityTimelineRunRow }) { + return +} +``` + +Child components then subscribe to the child collection where they render it: + +```tsx +function AgentResponse({ + items, +}: { + items: Collection +}) { + const { data: runItems = [] } = useLiveQuery(items) + return runItems.map((item) => + item.text ? ( + + ) : ( + + ) + ) +} + +function TextItem({ text }: { text: EntityTimelineTextItem }) { + const { data: chunks = [] } = useLiveQuery(text.chunks) + // Markdown streaming strategy TBD; do not eagerly concatenate in the query. +} +``` + +This is the key performance boundary. A text delta should update the text row's +`chunks` child collection and the subscribed text component, not rematerialize +the whole run, whole timeline, or all previous messages. + +Text item rows should include ordered chunks/deltas rather than an eagerly +concatenated text string: + +```ts +type EntityTimelineTextItem = { + run_id: string + order: EntityTimelineOrder + status: `streaming` | `completed` + chunks: Collection +} + +type EntityTimelineTextChunk = { + text_id: string + run_id: string + order: EntityTimelineOrder + delta: string +} +``` + +Do not decide the final text materialization strategy in this plan. First review +how the streaming Markdown parser should consume chunks so the UI can avoid +unnecessary string rebuilding. + +Tool call item rows should preserve the current display fields: + +```ts +type EntityTimelineToolCallItem = { + run_id: string + order: EntityTimelineOrder + tool_name: string + status: `started` | `args_complete` | `executing` | `completed` | `failed` + args?: unknown + result?: unknown + error?: string +} +``` + +### Wake source + +Project the current wake display payload: + +```ts +const wake = q.from({ wake: db.collections.wakes }).select(({ wake }) => ({ + order: wake._timeline_order, + payload: { + type: `wake` as const, + timestamp: wake.timestamp, + source: wake.source, + timeout: wake.timeout, + changes: wake.changes, + finished_child: wake.finished_child, + other_children: wake.other_children, + }, +})) +``` + +### Manifest source + +The app UI currently queries manifests separately and merges them into the +timeline. The new query should include them directly: + +```ts +const manifest = q + .from({ manifest: db.collections.manifests }) + .select(({ manifest }) => ({ + order: manifest._timeline_order, + manifest, + })) +``` + +Entity status enrichment can be handled in one of two ways: + +1. Keep the app UI's separate status query against the global entity registry. +2. Add a runtime-level related-entity projection for child/source manifests. + +Prefer option 1 initially to keep `createEntityTimelineQuery` focused on the +entity stream itself. + +### Context sources + +Include context history rows so the query is comprehensive for userland +timeline UIs, even if the built-in chat UI chooses not to display them yet: + +```ts +const contextInserted = q + .from({ contextInserted: db.collections.contextInserted }) + .select(({ contextInserted }) => ({ + order: contextInserted._timeline_order, + id: contextInserted.id, + name: contextInserted.name, + attrs: contextInserted.attrs, + content: contextInserted.content, + timestamp: contextInserted.timestamp, + })) +``` + +`contextRemoved` should mirror this shape in a `contextRemoved` source branch. + +### Steps, errors, and lifecycle-like rows + +`steps` are model-generation step lifecycle rows emitted by the outbound bridge: + +- `onStepStart()` inserts a step with `status: 'started'`, `step_number`, + `run_id`, and optional `model_provider`/`model_id`. +- `onStepEnd()` updates that step to `status: 'completed'` with + `finish_reason` and optional `duration_ms`. + +They are not user messages or tool calls. They describe LLM/model execution +attempts within a run. Today they are used as run metadata and for ordering +anchoring, but the built-in chat UI does not render them as visible rows. + +First pass: keep steps as nested run metadata. Include them in the run row so +observability UIs can render model/step metadata, but do not make them top-level +timeline rows unless we decide they are user-visible inline events. A later +option can expose step rows inline for debugging/audit views. + +Apply the same rule to top-level errors without `run_id`, entity lifecycle rows, +and signal rows: include them in `createEntityTimelineQuery` only when they are +events a user would reasonably expect to see at the point they happened. If a row +is primarily current entity state, keep it out of the event timeline and expose +it through state/status APIs instead. + +## Ordering + +Use a stable first-event order injected by `createEntityStreamDB` at stream +ingest time. + +Do not use the existing `__electricRowOffsets` map for live timeline ordering. +That map is currently updated on every change event, so it represents the last +offset that affected a row. The timeline needs "when this event first happened", +not "when this row was last updated". Run rows, step rows, text rows, and +tool-call rows are often updated after they are inserted; using a last-update +offset would move old timeline items forward incorrectly. + +Decision: + +- Add an internal row field, `_timeline_order`, to the built-in event row + types/schemas that participate in timelines. +- In `createEntityStreamDB`'s `onBeforeBatch`, derive a stable order token for + each incoming change event from the Electric/Durable Streams offset and the + item index within the batch. +- Store the first order token seen for each `(collection, row key)` in a map. +- Before StreamDB applies each insert/update/upsert change, inject that first + order token into `item.value._timeline_order`. +- Preserve the original first order token on later updates. +- Use `_timeline_order` as the primary order expression in + `createEntityTimelineQuery`. + +`_timeline_order` should be a lexically sortable string. Use an +offset-plus-index token rather than raw offset alone. Some stream batches can +contain multiple events, and not every event is guaranteed to carry a unique +per-item offset. A token such as `${offset}:${itemIndex.padStart(...)}` gives a +stable total order while still preserving the underlying stream order. + +Because `_timeline_order` is an actual query field, it must be accepted by the +built-in row schemas/types. Do not rely on mutating event values if validation or +row parsing would strip the field before TanStack DB sees it. + +Reset/replay behavior: + +- On stream reset, clear the first-order map. +- During replay, rebuild `_timeline_order` from replayed events. +- If the first event observed for a row is an update rather than an insert, use + that first observed update's order. It is still stable for the local + materialization. + +Optimistic local rows: + +- Pending optimistic rows that do not yet have a stream offset should receive a + high pending order token so they sort after persisted rows. +- Pending order tokens should preserve local insertion order, e.g. + `pending:${counter.padStart(...)}`. +- Optimistic rows should render through the same timeline query/component path as + synced rows. +- Optimistic rows should be applied through TanStack DB mutation APIs so the + collection sees them as normal local rows, including their pending + `_timeline_order`. +- When the persisted stream event arrives, the server-backed row should replace + the optimistic row and use its real `_timeline_order` from the writeback. + +This keeps ordering local to StreamDB ingestion and avoids modifying every write +site (`EntityManager`, `outbound-bridge`, context writes, wake writes, etc.) to +precompute an order value they do not know yet. + +For run rows, use `run._timeline_order` directly. Because `_timeline_order` is +assigned from the first event observed for the run row and preserved on later +updates, completion updates no longer move the run later in the timeline. This +removes the need for the current imperative run/text re-anchoring workaround. + +## Runtime exports + +Add new exports alongside the existing aggregate API: + +```ts +export { createEntityIncludesQuery, createEntityTimelineQuery } +export { useEntityTimeline } + +export type { + EntityTimelineQueryRow, + EntityTimelineInboxRow, + EntityTimelineRunRow, + EntityTimelineRunItem, + EntityTimelineTextItem, + EntityTimelineTextChunk, + EntityTimelineToolCallItem, + EntityTimelineWakeRow, + EntityTimelineManifestRow, +} +``` + +Optionally add a convenience collection factory later: + +```ts +createEntityTimelineCollection(db) +``` + +The query function should come first so users can compose it inside their own +queries. + +## Built-in UI migration + +Migrate hot paths to the row-oriented query: + +1. `packages/agents-server-ui/src/hooks/useEntityTimeline.ts` + - Replace `createEntityIncludesQuery` with the exported + `useEntityTimeline`/`createEntityTimelineQuery` path. + - Remove the separate manifest query/merge once manifests are included. + - Keep the pending inbox query separate inside the hook and return + `pendingInbox`. + - Return `generationActive` from the hook as derived state, preferably from a + small dedicated live query over active runs rather than by scanning the + full timeline on every change. + - Keep global entity status enrichment separate initially. + - When matching processed/pending inbox messages, use `row.inbox.$key` as the + raw inbox key and `row.$key` only as the timeline row key. +2. `packages/agents-server-ui/src/components/EntityTimeline.tsx` + - Accept the new row union, or adapt it to the current `TimelineEntry` + shape as an intermediate step. + - Use each row's `$key` virtual prop as the React/virtualizer key. + - Pass nested child collections such as `run.items` to child row components + instead of rebuilding `section.items` arrays. +3. `packages/agents-server-ui/src/components/AgentResponse.tsx` + - Subscribe to nested run items with `useLiveQuery(run.items)`. + - Update text rendering after reviewing the streaming Markdown parser; avoid + eager timeline-query concatenation. + - Keep markdown caching and tool display parsing in the component layer. +4. `packages/electric-ax/src/observe-ui.tsx` + - Move from aggregate includes to timeline rows. +5. `examples/deep-survey/src/ui/components/ChatSidebar.tsx` + - Move from aggregate includes to timeline rows if it remains a streaming + UI example. + +Keep `useChat` on `createEntityIncludesQuery` initially as a stable convenience +hook for aggregate chat data. `useEntityTimeline` is the new row-oriented hook +for timeline UIs. + +## Tests + +Add runtime tests for the new query: + +- Multi-source timeline returns inbox, run, wake, manifest, and context rows in + order. +- Rows from different source collections with the same raw key do not collide; + use the multi-source `$key` virtual prop as the public row key. +- Insert/update/delete in each source updates only the relevant branch rows. +- A text delta insertion updates the nested text chunks include for the active + run item. +- Parent timeline rows expose child collections, not arrays, for nested + includes. +- Child UI components subscribe to child collections with `useLiveQuery`. +- A tool call update updates the matching nested run item. +- Non-optimistic pending inbox rows are excluded from the main timeline by + default. +- Optimistic pending rows created through DB mutation APIs are included in the + main timeline by default. +- `opts.inboxMode` can exclude inbox rows or include all inbox statuses. +- Manifests are present in the timeline without a separate merge. +- Run rows keep their first-event `_timeline_order` when completed, so completion + updates do not move them later in the timeline. +- `_timeline_order` is derived from the first stream offset/item index for a row + and does not change when that row is updated. +- Multiple events in the same stream batch still receive deterministic relative + ordering. +- Stream reset/replay clears and rebuilds `_timeline_order` deterministically. +- Rows first observed through an update still get a stable first-observed + `_timeline_order`. +- Optimistic rows sort after persisted rows until replaced by server-backed rows + with real `_timeline_order`. +- Optimistic rows preserve local insertion order and render through the same row + path as synced rows. + +Add UI-level tests or focused hook tests: + +- `useEntityTimeline` no longer rebuilds the manifest merge path. +- Streaming text updates keep prior top-level timeline row identities stable + where TanStack DB exposes stable maintained rows. +- Optimistic inbox behavior renders through the same `rows` path in `ChatView`. +- Pending inbox editing state still works in `ChatView` without double-rendering + the same optimistic row. +- `generationActive` is returned by `useEntityTimeline` without requiring + timeline consumers to scan every row. + +## Deferred decisions and audit notes + +- Do not add a convenience `text` string to text rows in the first pass. Leave + text materialization to consumers until we have reviewed how the streaming + Markdown parser should consume chunks optimally. +- Validate the `_timeline_order` injection point against `@durable-streams/state` + internals. If `onBeforeBatch` cannot safely mutate incoming event values before + validation and collection application, add first-event-order support in + StreamDB itself rather than pushing ordering logic into every writer. +- Leave `normalizeEntityTimelineData` and related aggregate helpers in place for + now. After the row-oriented timeline migration is complete, audit which + aggregate helpers are still used and remove unused compatibility code then. + +## Implementation phases + +1. Define row types, hook return types, and source subquery helpers in + `agents-runtime`. +2. Add `_timeline_order` row support: + - first prove the injection point by testing that values added in + `onBeforeBatch` survive validation and collection application; + - extend built-in event row schemas/types; + - inject first-offset-plus-index order tokens in `createEntityStreamDB`; + - preserve the first token across updates. +3. Implement `createEntityTimelineQuery` with multi-source `from` and no outer + `select`. +4. Add `useEntityTimeline` returning rows plus derived state + (`generationActive`, pending inbox, etc.). +5. Add run nested includes for ordered text/tool-call items, errors, and nested + step metadata. +6. Add text chunk includes and remove eager text concatenation from the new API. +7. Add parity tests against key `createEntityIncludesQuery` behaviors. +8. Migrate `agents-server-ui` behind the same public `useEntityTimeline` hook. +9. Migrate observe/example streaming UIs. +10. Document when to use `createEntityIncludesQuery` versus + `createEntityTimelineQuery`. diff --git a/packages/agents-runtime/src/client.ts b/packages/agents-runtime/src/client.ts index 49b797e2ba..0f96793f6c 100644 --- a/packages/agents-runtime/src/client.ts +++ b/packages/agents-runtime/src/client.ts @@ -2,8 +2,10 @@ export { createEntityStreamDB } from './entity-stream-db' export { createAgentsClient } from './agents-client' export { compareTimelineOrders, + createPendingTimelineOrder, createEntityErrorsQuery, createEntityIncludesQuery, + createEntityTimelineQuery, getEntityState, normalizeEntityTimelineData, normalizeTimelineEntities, @@ -30,8 +32,16 @@ export type { export type { EntityTimelineContentItem, EntityTimelineData, + EntityTimelineInboxMode, + EntityTimelineQueryOptions, + EntityTimelineQueryRow, + EntityTimelineRunRow, + EntityTimelineRunItem, EntityTimelineSection, EntityTimelineState, + EntityTimelineTextChunk, + EntityTimelineTextItem, + EntityTimelineToolCallItem, IncludesEntity, IncludesInboxMessage, IncludesRun, diff --git a/packages/agents-runtime/src/entity-schema.ts b/packages/agents-runtime/src/entity-schema.ts index 3f7d62cfb6..4ce00d68f2 100644 --- a/packages/agents-runtime/src/entity-schema.ts +++ b/packages/agents-runtime/src/entity-schema.ts @@ -43,6 +43,7 @@ type SequencedPersistedRow = Omit< > & { key: string _seq?: number + _timeline_order?: string } type Schema = z.ZodType type ChildEntityStatusValue = `spawning` | `running` | `idle` | `stopped` @@ -269,6 +270,10 @@ function createJsonObjectSchema(): Schema> { > } +const timelineOrderField = { + _timeline_order: z.string().optional(), +} + function createChildEntityStatusSchema(): Schema { return z.enum([`spawning`, `running`, `idle`, `stopped`]) } @@ -319,6 +324,7 @@ function createWakeConfigSchema(): Schema { function createRunSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, status: z.enum([`started`, `completed`, `failed`]), finish_reason: z.string().optional(), }) @@ -327,6 +333,7 @@ function createRunSchema(): Schema { function createStepSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, run_id: z.string().optional(), step_number: z.number().int(), status: z.enum([`started`, `completed`]), @@ -340,6 +347,7 @@ function createStepSchema(): Schema { function createTextSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, run_id: z.string().optional(), status: z.enum([`streaming`, `completed`]), }) @@ -348,6 +356,7 @@ function createTextSchema(): Schema { function createTextDeltaSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, text_id: z.string(), run_id: z.string(), delta: z.string(), @@ -357,6 +366,7 @@ function createTextDeltaSchema(): Schema { function createToolCallSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, run_id: z.string().optional(), tool_call_id: z.string().optional(), tool_name: z.string(), @@ -377,6 +387,7 @@ function createToolCallSchema(): Schema { function createReasoningSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, status: z.enum([`streaming`, `completed`]), }) } @@ -384,6 +395,7 @@ function createReasoningSchema(): Schema { function createErrorEventSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, error_code: z.string(), message: z.string(), run_id: z.string().optional(), @@ -395,6 +407,7 @@ function createErrorEventSchema(): Schema { function createMessageReceivedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, from: z.string().optional(), payload: z.unknown().optional(), timestamp: z.string().optional(), @@ -410,6 +423,7 @@ function createMessageReceivedSchema(): Schema { function createWakeSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, timestamp: z.string(), source: z.string(), timeout: z.boolean(), @@ -422,6 +436,7 @@ function createWakeSchema(): Schema { function createEntityCreatedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, entity_type: z.string(), timestamp: z.string(), args: createJsonObjectSchema(), @@ -432,6 +447,7 @@ function createEntityCreatedSchema(): Schema { function createEntityStoppedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, timestamp: z.string(), reason: z.string().optional(), }) @@ -440,6 +456,7 @@ function createEntityStoppedSchema(): Schema { function createChildStatusSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, entity_url: z.string(), entity_type: z.string(), status: createChildEntityStatusSchema(), @@ -449,12 +466,14 @@ function createChildStatusSchema(): Schema { function createTagEntrySchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, value: z.string(), }) } function createContextInsertedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, id: z.string(), name: z.string(), attrs: z.record(z.string(), z.union([z.string(), z.number(), z.boolean()])), @@ -466,6 +485,7 @@ function createContextInsertedSchema(): Schema { function createContextRemovedSchema(): Schema { return z.object({ key: z.string().optional(), + ...timelineOrderField, id: z.string(), name: z.string(), timestamp: z.string(), @@ -483,6 +503,7 @@ function createManifestSchema(): Schema< return z.union([ z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`child`), id: z.string(), entity_type: z.string(), @@ -492,6 +513,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`source`), sourceType: z.string(), sourceRef: z.string(), @@ -500,6 +522,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`shared-state`), id: z.string(), mode: z.enum([`create`, `connect`]), @@ -514,6 +537,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`effect`), id: z.string(), function_ref: z.string(), @@ -521,6 +545,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`context`), id: z.string(), name: z.string(), @@ -533,6 +558,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`schedule`), id: z.string(), scheduleType: z.literal(`cron`), @@ -543,6 +569,7 @@ function createManifestSchema(): Schema< }), z.object({ key: z.string().optional(), + ...timelineOrderField, kind: z.literal(`schedule`), id: z.string(), scheduleType: z.literal(`future_send`), diff --git a/packages/agents-runtime/src/entity-stream-db.ts b/packages/agents-runtime/src/entity-stream-db.ts index 7e47642180..f737181cee 100644 --- a/packages/agents-runtime/src/entity-stream-db.ts +++ b/packages/agents-runtime/src/entity-stream-db.ts @@ -46,6 +46,7 @@ type EntityCollectionMeta = { __electricSourceDb?: EntityStreamDBWithActions __electricSourceId?: string __electricRowOffsets?: Map + __electricTimelineOrders?: Map } type EntityCollections = { @@ -98,6 +99,14 @@ type EntityStreamDBOptions = { ) const WRITE_TXID_TIMEOUT_MS = 20_000 +const TIMELINE_OFFSET_WIDTH = 24 +const TIMELINE_BATCH_INDEX_WIDTH = 8 + +function formatStreamTimelineOrder(offset: string, index: number): string { + return `stream:${offset.padStart(TIMELINE_OFFSET_WIDTH, `0`)}:${index + .toString() + .padStart(TIMELINE_BATCH_INDEX_WIDTH, `0`)}` +} /** * Create a StreamDB connected to a Electric Agents entity stream. @@ -137,9 +146,14 @@ export function createEntityStreamDB( } const collectionNameByEventType = new Map() const rowOffsetsByCollection = new Map>() + const timelineOrdersByCollection = new Map< + string, + Map + >() for (const [name, def] of Object.entries(mergedCollections)) { collectionNameByEventType.set(def.type, name) rowOffsetsByCollection.set(name, new Map()) + timelineOrdersByCollection.set(name, new Map()) } // Build a reverse map from TanStack DB collection id to schema key const collIdToSchemaKey: Record = {} @@ -167,6 +181,7 @@ export function createEntityStreamDB( const cleanRow = (row: Record): Record => { const clone = { ...row } delete clone._seq + delete clone._timeline_order return clone } @@ -270,22 +285,36 @@ export function createEntityStreamDB( onBeforeBatch: (batch) => { opts?.onBeforeBatch?.(batch) replayBatchOffset.current = batch.offset - for (const item of batch.items) { + batch.items.forEach((item, itemIndex) => { if (isControlEvent(item)) { if (item.headers.control === `reset`) { for (const offsets of rowOffsetsByCollection.values()) { offsets.clear() } + for (const orders of timelineOrdersByCollection.values()) { + orders.clear() + } } - continue + return } - if (!isChangeEvent(item)) continue + if (!isChangeEvent(item)) return const collectionName = collectionNameByEventType.get(item.type) - if (!collectionName) continue - rowOffsetsByCollection - .get(collectionName) - ?.set(item.key, item.headers.offset ?? batch.offset) - } + if (!collectionName) return + const offset = item.headers.offset ?? batch.offset + rowOffsetsByCollection.get(collectionName)?.set(item.key, offset) + + if (item.headers.operation === `delete`) return + if (typeof item.value !== `object` || item.value === null) return + + const orders = timelineOrdersByCollection.get(collectionName) + if (!orders) return + let order = orders.get(item.key) + if (!order) { + order = formatStreamTimelineOrder(offset, itemIndex) + orders.set(item.key, order) + } + ;(item.value as Record)._timeline_order = order + }) }, onBatch: (batch) => { opts?.onBatch?.(batch) @@ -634,6 +663,20 @@ export function createEntityStreamDB( ) } const primaryKey = mergedCollections[collectionName]!.primaryKey + if ( + event.headers.operation !== `delete` && + typeof event.value === `object` && + event.value !== null + ) { + const orders = timelineOrdersByCollection.get(collectionName) + const offset = + event.headers.offset ?? + `local:${Date.now().toString().padStart(13, `0`)}` + const order = + orders?.get(event.key) ?? formatStreamTimelineOrder(offset, 0) + orders?.set(event.key, order) + ;(event.value as Record)._timeline_order = order + } const transaction = createWriteTransaction({ debugOrigin: `apply-event:${event.type}:${event.headers.operation}`, }) @@ -699,6 +742,8 @@ export function createEntityStreamDB( replayDb.collections )) { collection.__electricRowOffsets = rowOffsetsByCollection.get(collectionName) + collection.__electricTimelineOrders = + timelineOrdersByCollection.get(collectionName) } return db as EntityStreamDBWithActions diff --git a/packages/agents-runtime/src/entity-timeline.ts b/packages/agents-runtime/src/entity-timeline.ts index 9d4d208b78..46fea8fbc1 100644 --- a/packages/agents-runtime/src/entity-timeline.ts +++ b/packages/agents-runtime/src/entity-timeline.ts @@ -9,11 +9,20 @@ import { or, toArray, } from '@durable-streams/state' -import type { InitialQueryBuilder, QueryBuilder } from '@tanstack/db' +import * as TanStackDB from '@tanstack/db' +import type { + Collection, + InitialQueryBuilder, + QueryBuilder, +} from '@tanstack/db' import type { EntityStreamDB } from './entity-stream-db' import type { ChildStatusEntry, MessageReceived } from './entity-schema' import type { ManifestEntry, Wake, WakeMessage } from './types' +const { caseWhen } = TanStackDB as typeof TanStackDB & { + caseWhen: (condition: unknown, value: T) => T +} + export type EntityTimelineState = | `pending` | `queued` @@ -160,6 +169,112 @@ export interface EntityTimelineData { entities: Array } +export type EntityTimelineInboxMode = `processed` | `all` + +export interface EntityTimelineQueryOptions { + inboxMode?: EntityTimelineInboxMode +} + +export interface EntityTimelineTextChunk { + key: string + text_id: string + run_id: string + order: TimelineOrder + delta: string +} + +export interface EntityTimelineTextItem { + key: string + run_id?: string + order: TimelineOrder + status: `streaming` | `completed` + content: string +} + +export interface EntityTimelineToolCallItem { + key: string + run_id?: string + order: TimelineOrder + tool_call_id?: string + tool_name: string + status: `started` | `args_complete` | `executing` | `completed` | `failed` + args?: unknown + result?: unknown + error?: string +} + +export type EntityTimelineRunItem = + | { + $key: string + text: EntityTimelineTextItem + toolCall?: undefined + } + | { + $key: string + text?: undefined + toolCall: EntityTimelineToolCallItem + } + +export interface EntityTimelineStepItem { + key: string + run_id?: string + order: TimelineOrder + step_number: number + status: `started` | `completed` + model_id?: string + duration_ms?: number +} + +export interface EntityTimelineErrorItem { + key: string + run_id?: string + error_code: string + message: string +} + +export interface EntityTimelineRunRow { + key: string + order: TimelineOrder + status: `started` | `completed` | `failed` + finish_reason?: string + items: Collection + steps: Collection + errors: Collection +} + +export type EntityTimelineInboxRow = IncludesInboxMessage +export type EntityTimelineWakeRow = IncludesWakeMessage + +export type EntityTimelineQueryRow = + | { + $key: string + inbox: EntityTimelineInboxRow + run?: undefined + wake?: undefined + manifest?: undefined + } + | { + $key: string + inbox?: undefined + run: EntityTimelineRunRow + wake?: undefined + manifest?: undefined + } + | { + $key: string + inbox?: undefined + run?: undefined + wake: EntityTimelineWakeRow + manifest?: undefined + } + | { + $key: string + inbox?: undefined + run?: undefined + wake?: undefined + manifest: ManifestEntry + } + function normalizeTimelineRun(run: IncludesRun): IncludesRun { const texts = run.texts .map((text) => { @@ -319,6 +434,15 @@ function readInlineSeq(row: object): number | undefined { return typeof seq === `number` ? seq : undefined } +function readTimelineOrder(row: object): string | undefined { + const order = Reflect.get(row, `_timeline_order`) + return typeof order === `string` ? order : undefined +} + +export function createPendingTimelineOrder(index: number): string { + return `~pending:${index.toString().padStart(12, `0`)}` +} + function toSeqOrderToken(seq: number): string { return `seq:${seq.toString().padStart(12, `0`)}` } @@ -360,6 +484,11 @@ function readRequiredOrderToken( row: TRow, index: number ): string { + const timelineOrder = readTimelineOrder(row) + if (timelineOrder) { + return timelineOrder + } + const offset = collection.__electricRowOffsets?.get(row.key) if (offset) { return `offset:${offset}` @@ -380,6 +509,11 @@ function readOptionalOrderToken( }, row: TRow ): string | undefined { + const timelineOrder = readTimelineOrder(row) + if (timelineOrder) { + return timelineOrder + } + const offset = collection.__electricRowOffsets?.get(row.key) if (offset) { return `offset:${offset}` @@ -991,6 +1125,178 @@ const getEntityWakesCollection = cachedCollectionFactory((db: EntityStreamDB) => }) ) +type EntityTimelineQueryBuilder = (q: InitialQueryBuilder) => QueryBuilder + +/** + * Builds a live timeline query for an entity stream. + * + * The returned query is a multi-source timeline ordered by each row's + * `_timeline_order`. Each result row has TanStack DB's virtual `$key` plus one + * populated source property: + * + * - `{ inbox }` for user inbox messages. + * - `{ run }` for agent runs. + * - `{ wake }` for wake events. + * - `{ manifest }` for manifest entries. + * + * Run rows include live child collections rather than materialized arrays: + * `run.items`, `run.steps`, and `run.errors`. Pass those child collections to + * `useLiveQuery` (or another live-query consumer) in child renderers to receive + * fine-grained updates while text chunks stream in. Text run items expose their + * concatenated streamed content as `item.text.content`. + */ +export function createEntityTimelineQuery( + db: EntityStreamDB, + opts: EntityTimelineQueryOptions = {} +): EntityTimelineQueryBuilder { + return (q: InitialQueryBuilder) => buildEntityTimelineQuery(q, db, opts) +} + +function buildEntityTimelineQuery( + q: InitialQueryBuilder, + db: EntityStreamDB, + opts: EntityTimelineQueryOptions +): QueryBuilder { + const inboxMode = opts.inboxMode ?? `processed` + + let inbox = q.from({ inbox: db.collections.inbox }) + if (inboxMode === `processed`) { + inbox = inbox.where(({ inbox }) => + or( + eq(coalesce(inbox.status, `processed`), `processed`), + eq(inbox.$synced, false) + ) + ) + } + + const inboxSource = inbox.select(({ inbox }) => ({ + order: coalesce(inbox._timeline_order, `~`), + key: inbox.key, + from: coalesce(inbox.from, `unknown`), + payload: inbox.payload, + timestamp: coalesce(inbox.timestamp, ``), + mode: coalesce(inbox.mode, `immediate`), + status: coalesce(inbox.status, `processed`), + position: inbox.position, + processed_at: inbox.processed_at, + cancelled_at: inbox.cancelled_at, + })) + + const wakeSource = q + .from({ wake: db.collections.wakes }) + .select(({ wake }) => ({ + key: wake.key, + order: coalesce(wake._timeline_order, `~`), + payload: { + type: `wake` as const, + timestamp: wake.timestamp, + source: wake.source, + timeout: wake.timeout, + changes: wake.changes, + finished_child: wake.finished_child, + other_children: wake.other_children, + }, + })) + + const runItemsSource = q + .from({ + text: db.collections.texts, + toolCall: db.collections.toolCalls, + }) + .select(({ text, toolCall }) => ({ + order: coalesce(text._timeline_order, toolCall._timeline_order, `~`), + run_id: coalesce(text.run_id, toolCall.run_id, ``), + text: caseWhen(text.key, { + key: text.key, + run_id: text.run_id, + order: coalesce(text._timeline_order, `~`), + status: text.status, + }), + textContent: concat( + toArray( + q + .from({ chunk: db.collections.textDeltas }) + .where(({ chunk }) => eq(chunk.text_id, text.key)) + .orderBy(({ chunk }) => coalesce(chunk._timeline_order, `~`)) + .select(({ chunk }) => chunk.delta) + ) + ), + toolCall: caseWhen(toolCall.key, { + key: toolCall.key, + run_id: toolCall.run_id, + order: coalesce(toolCall._timeline_order, `~`), + tool_call_id: toolCall.tool_call_id, + tool_name: toolCall.tool_name, + status: toolCall.status, + args: toolCall.args, + result: toolCall.result, + error: toolCall.error, + }), + })) + + const runSource = q.from({ run: db.collections.runs }).select(({ run }) => ({ + key: run.key, + order: coalesce(run._timeline_order, `~`), + status: run.status, + finish_reason: run.finish_reason, + items: q + .from({ item: runItemsSource }) + .where(({ item }) => eq(item.run_id, run.key)) + .orderBy(({ item }) => item.order) + .select(({ item }) => ({ + text: caseWhen(item.text.key, { + key: item.text.key, + run_id: item.text.run_id, + order: item.text.order, + status: item.text.status, + content: item.textContent, + }), + toolCall: item.toolCall, + })), + steps: q + .from({ step: db.collections.steps }) + .where(({ step }) => eq(step.run_id, run.key)) + .orderBy(({ step }) => step.step_number) + .orderBy(({ step }) => coalesce(step._timeline_order, `~`)) + .select(({ step }) => ({ + key: step.key, + run_id: step.run_id, + order: coalesce(step._timeline_order, `~`), + step_number: step.step_number, + status: step.status, + model_id: step.model_id, + duration_ms: step.duration_ms, + })), + errors: q + .from({ error: db.collections.errors }) + .where(({ error }) => eq(error.run_id, run.key)) + .orderBy(({ error }) => error.key) + .select(({ error }) => ({ + key: error.key, + run_id: error.run_id, + error_code: error.error_code, + message: error.message, + })), + })) + + return q + .from({ + inbox: inboxSource, + run: runSource, + wake: wakeSource, + manifest: db.collections.manifests, + }) + .orderBy(({ inbox, run, wake, manifest }) => + coalesce( + inbox.order, + run.order, + wake.order, + manifest._timeline_order, + `~` + ) + ) +} + type EntityQueryBuilder = (q: InitialQueryBuilder) => QueryBuilder export function createEntityIncludesQuery( diff --git a/packages/agents-runtime/src/index.ts b/packages/agents-runtime/src/index.ts index 14ed10070d..1626996968 100644 --- a/packages/agents-runtime/src/index.ts +++ b/packages/agents-runtime/src/index.ts @@ -115,7 +115,9 @@ export { export type { EntityMembershipRow, EntityTags, TagOperation } from './tags' export { createEntityIncludesQuery, + createEntityTimelineQuery, createEntityErrorsQuery, + createPendingTimelineOrder, getEntityState, normalizeEntityTimelineData, normalizeTimelineEntities, @@ -124,6 +126,14 @@ export { export type { EntityTimelineData, EntityTimelineContentItem, + EntityTimelineInboxMode, + EntityTimelineQueryOptions, + EntityTimelineQueryRow, + EntityTimelineRunRow, + EntityTimelineRunItem, + EntityTimelineTextChunk, + EntityTimelineTextItem, + EntityTimelineToolCallItem, IncludesEntity, EntityTimelineSection, EntityTimelineState, diff --git a/packages/agents-runtime/tsdown.config.ts b/packages/agents-runtime/tsdown.config.ts index 3b88224702..f106db7088 100644 --- a/packages/agents-runtime/tsdown.config.ts +++ b/packages/agents-runtime/tsdown.config.ts @@ -3,6 +3,7 @@ import type { Options } from 'tsdown' const config: Options = { entry: [`src/index.ts`, `src/react.ts`, `src/tools.ts`, `src/client.ts`], format: [`esm`, `cjs`], + external: [/^@tanstack\//, /^@durable-streams\//], dts: true, clean: true, } diff --git a/packages/agents-server-ui/src/components/AgentResponse.tsx b/packages/agents-server-ui/src/components/AgentResponse.tsx index 6624e1b016..dae207b0a1 100644 --- a/packages/agents-server-ui/src/components/AgentResponse.tsx +++ b/packages/agents-server-ui/src/components/AgentResponse.tsx @@ -7,6 +7,7 @@ import { useRef, useState, } from 'react' +import { useLiveQuery } from '@tanstack/react-db' import { Streamdown } from 'streamdown' import { getCachedMarkdownRender, @@ -27,6 +28,9 @@ import { ThinkingIndicator } from './ThinkingIndicator' import styles from './AgentResponse.module.css' import type { EntityTimelineContentItem, + EntityTimelineRunRow, + EntityTimelineTextItem, + EntityTimelineToolCallItem, EntityTimelineSection, } from '@electric-ax/agents-runtime/client' @@ -37,6 +41,16 @@ type AgentResponseSection = Extract< const SHIKI_SETTLE_MS = 80 +function compareTimelineOrderValues( + left: string | number, + right: string | number +): number { + if (typeof left === `number` && typeof right === `number`) { + return left - right + } + return String(left).localeCompare(String(right)) +} + const MarkdownSegment = memo(function MarkdownSegment({ text, contentHash, @@ -51,12 +65,12 @@ const MarkdownSegment = memo(function MarkdownSegment({ canCache: boolean }): React.ReactElement { const wrapperRef = useRef(null) + const cachedHtmlHashRef = useRef(null) // Tracks the content hash that the currently-displayed `cachedHtml` // belongs to, so we can distinguish "the underlying text changed and our // cached HTML is now stale" from "only the column width changed and our // cached HTML is still semantically correct, just laid out for a // different width". - const cachedHtmlHashRef = useRef(null) const [cachedHtml, setCachedHtmlState] = useState(() => { if (!canCache || !isMarkdownRenderCacheReady() || renderWidth <= 0) return null @@ -230,6 +244,119 @@ function toolItemToCopyText(item: EntityTimelineContentItem): string { return parts.join(`\n`) } +function liveToolCallToContentItem( + item: EntityTimelineToolCallItem +): Extract { + return { + kind: `tool_call`, + toolCallId: item.tool_call_id ?? item.key, + toolName: item.tool_name, + args: + item.args && typeof item.args === `object` + ? (item.args as Record) + : {}, + status: item.status, + result: typeof item.result === `string` ? item.result : undefined, + isError: Boolean(item.error), + } +} + +const LiveTextItem = memo(function LiveTextItem({ + item, + isStreaming, + renderWidth, +}: { + item: EntityTimelineTextItem + isStreaming: boolean + renderWidth: number +}): React.ReactElement { + return ( + + ) +}) + +export const AgentResponseLive = memo(function AgentResponseLive({ + run, + isStreaming, + timestamp, + renderWidth = 0, +}: { + run: EntityTimelineRunRow + isStreaming: boolean + timestamp?: number | null + renderWidth?: number +}): React.ReactElement { + const { data: items = [] } = useLiveQuery( + (q) => (run.items ? q.from({ item: run.items }) : undefined), + [run.items] + ) + const sortedItems = useMemo( + () => + [...items].sort((a, b) => + compareTimelineOrderValues( + a.text?.order ?? a.toolCall?.order ?? `~`, + b.text?.order ?? b.toolCall?.order ?? `~` + ) + ), + [items] + ) + const done = run.status !== `started` + const lastItem = sortedItems[sortedItems.length - 1] + const lastTextHasContent = lastItem?.text !== undefined + const showThinking = isStreaming && !done && !lastTextHasContent + const showTimestamp = timestamp != null && !isStreaming + const hasLeadingMeta = showThinking || done + + return ( + + {sortedItems.map((item, i) => { + if (item.text) { + return ( + + ) + } + + return ( + + ) + })} + + + {showThinking && } + {done && ( + + ✓ done + + )} + {showTimestamp && ( + <> + {hasLeadingMeta && ( + + · + + )} + + + )} + + + ) +}) + export const AgentResponse = memo(function AgentResponse({ section, isStreaming, diff --git a/packages/agents-server-ui/src/components/EntityTimeline.tsx b/packages/agents-server-ui/src/components/EntityTimeline.tsx index 1905b95984..9074ce62f5 100644 --- a/packages/agents-server-ui/src/components/EntityTimeline.tsx +++ b/packages/agents-server-ui/src/components/EntityTimeline.tsx @@ -31,7 +31,7 @@ import { useElectricAgents } from '../lib/ElectricAgentsProvider' import { warmMarkdownRenderCache } from '../lib/markdownRenderCache' import { Icon, IconButton, ScrollArea, Stack, Text, Tooltip } from '../ui' import { UserMessage } from './UserMessage' -import { AgentResponse } from './AgentResponse' +import { AgentResponseLive } from './AgentResponse' import { InlineEventCard } from './InlineEventCard' import { InlineStatusBadge } from './InlineStatusBadge' import { @@ -44,12 +44,28 @@ import { } from '../lib/formatTime' import styles from './EntityTimeline.module.css' import type { + EntityTimelineSection, + EntityTimelineQueryRow, IncludesEntity, Manifest, } from '@electric-ax/agents-runtime/client' -import type { TimelineEntry } from '../lib/timelineEntries' import type { PaneFindAdapter, PaneFindMatch } from '../hooks/usePaneFind' +type RenderTimelineRow = EntityTimelineQueryRow +type WakeSection = Extract + +function renderRowKey(row: RenderTimelineRow): string { + return row.$key +} + +function readInboxText(payload: unknown): string { + if (payload && typeof payload === `object`) { + const text = (payload as { text?: unknown }).text + if (typeof text === `string`) return text + } + return typeof payload === `string` ? payload : `` +} + /** * Width-aware row-height estimate used as the initial size hint for the * virtualizer (before the real DOM has been measured). Producing an estimate @@ -65,7 +81,7 @@ import type { PaneFindAdapter, PaneFindMatch } from '../hooks/usePaneFind' * width and multiply by the body line height. */ function estimateRowHeight( - row: TimelineEntry | undefined, + row: RenderTimelineRow | undefined, contentWidth: number ): number { if (!row) return 120 @@ -76,26 +92,17 @@ function estimateRowHeight( const charsPerLine = Math.max(40, Math.floor(usableWidth / 7)) const lineHeight = 22 // 14px font * ~1.55 leading - if (row.section.kind === `user_message`) { - const lines = Math.max(1, Math.ceil(row.section.text.length / charsPerLine)) - // bubble padding (24) + meta row (~24) + content + if (row.inbox) { + const lines = Math.max( + 1, + Math.ceil(readInboxText(row.inbox.payload).length / charsPerLine) + ) return Math.max(64, 48 + lines * lineHeight) + timelineRowGap(row) } - if (row.section.kind === `wake`) { + if (row.wake || row.manifest) { return 76 + timelineRowGap(row) } - if (row.section.kind === `manifest`) { - return 76 + timelineRowGap(row) - } - - const textLength = row.section.items.reduce((total: number, item) => { - if (item.kind === `text`) return total + item.text.length - // Tool calls render as a compact block; assume ~3 lines. - return total + charsPerLine * 3 - }, 0) - const lines = Math.max(2, Math.ceil(textLength / charsPerLine)) - // status row (~24) + content + a little breathing room - return Math.max(120, 32 + lines * lineHeight) + timelineRowGap(row) + return 120 + timelineRowGap(row) } const BOTTOM_PIN_THRESHOLD = 8 @@ -104,10 +111,8 @@ const ROW_GAP = 24 const MANIFEST_ROW_GAP = 10 const ROW_SETTLE_MS = 500 -function timelineRowGap(row: TimelineEntry): number { - return row.section.kind === `manifest` || row.section.kind === `wake` - ? MANIFEST_ROW_GAP - : ROW_GAP +function timelineRowGap(row: RenderTimelineRow): number { + return row.manifest || row.wake ? MANIFEST_ROW_GAP : ROW_GAP } type TimelinePaneFindMatch = PaneFindMatch & { @@ -116,42 +121,27 @@ type TimelinePaneFindMatch = PaneFindMatch & { rowOccurrence: number } -function timelineRowSearchText(row: TimelineEntry): string { - const { section } = row - if (section.kind === `user_message`) return section.text - if (section.kind === `wake`) return wakeSectionText(section) - if (section.kind === `manifest`) return manifestSearchText(section.manifest) - - return section.items - .map((item) => { - if (item.kind === `text`) return item.text - const parts = [ - item.toolName, - JSON.stringify(item.args, null, 2), - item.result ?? ``, - ] - return parts.filter((part) => part.trim().length > 0).join(`\n`) +function timelineRowSearchText(row: RenderTimelineRow): string { + if (row.inbox) return readInboxText(row.inbox.payload) + if (row.wake) { + return wakeSectionText({ + kind: `wake`, + payload: row.wake.payload, + timestamp: Date.parse(row.wake.payload.timestamp), }) - .filter((part) => part.trim().length > 0) - .join(`\n\n`) + } + if (row.manifest) return manifestSearchText(row.manifest) + return `` } -function timelineRowLabel(row: TimelineEntry): string { - switch (row.section.kind) { - case `user_message`: - return `User message` - case `wake`: - return `Wake` - case `manifest`: - return `Manifest item` - case `agent_response`: - return `Agent response` - } +function timelineRowLabel(row: RenderTimelineRow): string { + if (row.inbox) return `User message` + if (row.wake) return `Wake` + if (row.manifest) return `Manifest item` + return `Agent response` } -function wakeReason( - section: Extract -): string { +function wakeReason(section: WakeSection): string { const { payload } = section if (payload.timeout) return `timeout` if (payload.finished_child) { @@ -166,9 +156,7 @@ function wakeReason( return payload.source } -function wakeSectionText( - section: Extract -): string { +function wakeSectionText(section: WakeSection): string { return [ `woke`, wakeReason(section), @@ -180,7 +168,7 @@ function wakeSectionText( function WakeTimelineRow({ section, }: { - section: Extract + section: WakeSection }): React.ReactElement { const reason = wakeReason(section) const details = wakeDetails(section) @@ -211,7 +199,7 @@ function WakeTimelineRow({ } function wakeDetails( - section: Extract + section: WakeSection ): Array<{ label: string; value: string }> { const { payload } = section const details = [ @@ -263,7 +251,7 @@ function wakeDetails( } function wakeChildOutput( - section: Extract + section: WakeSection ): { label: string; value: string } | null { const child = section.payload.finished_child if (!child) return null @@ -600,19 +588,8 @@ function entityUrlsFromKey(key: string): Array { return key.length === 0 ? [] : key.split(`\0`) } -// `section` and `responseTimestamp` are pulled out of the parent -// `EntityTimelineEntry` so React.memo's shallow compare can hit on -// the *section* identity. `buildTimelineEntries` returns a fresh -// `entries` array (and fresh entry objects) on every chunk during -// streaming, but the runtime caches finished agent sections in a -// WeakMap keyed by the underlying run row — so unchanged rows -// receive the identical `section` reference each render. With the -// previous `row` prop, that hit was masked by the always-new wrapper -// object; splitting the props lets memo skip every settled row and -// only re-render the streaming row + the row that just settled. const TimelineRow = memo(function TimelineRow({ - section, - responseTimestamp, + row, entityStopped, isStreaming, renderWidth, @@ -620,8 +597,7 @@ const TimelineRow = memo(function TimelineRow({ tileId, entityStatusByUrl, }: { - section: TimelineEntry[`section`] - responseTimestamp: TimelineEntry[`responseTimestamp`] + row: RenderTimelineRow entityStopped: boolean isStreaming: boolean renderWidth: number @@ -629,21 +605,42 @@ const TimelineRow = memo(function TimelineRow({ tileId: string | null entityStatusByUrl: Map }): React.ReactElement { - if (section.kind === `user_message`) { - return + if (row.inbox) { + const timestamp = Date.parse(row.inbox.timestamp) + return ( + + ) } - if (section.kind === `wake`) { - return + + if (row.wake) { + return ( + + ) } - if (section.kind === `manifest`) { + + if (row.manifest) { return ( @@ -651,17 +648,16 @@ const TimelineRow = memo(function TimelineRow({ } return ( - ) }) export function EntityTimeline({ - entries, + rows, loading, error, entityStopped, @@ -671,7 +667,7 @@ export function EntityTimeline({ entities = [], scrollToBottomSignal = 0, }: { - entries: Array + rows: Array loading: boolean error: string | null entityStopped: boolean @@ -681,7 +677,6 @@ export function EntityTimeline({ entities?: Array scrollToBottomSignal?: number }): React.ReactElement { - const rows = useMemo(() => entries, [entries]) const { entitiesCollection } = useElectricAgents() const referencedEntityUrlKey = useMemo( () => stableEntityUrlKey(entities.map((entity) => entity.url)), @@ -732,22 +727,23 @@ export function EntityTimeline({ const settledKeysRef = useRef(new Set()) const settleCheckTimerRef = useRef | null>(null) const handledScrollSignalRef = useRef(scrollToBottomSignal) + const previousStreamingAgentKeyRef = useRef(null) const textColumnWidth = Math.max(0, contentWidth - CHAT_SURFACE_GUTTER) - const firstMessage = rows.find( - ( - row - ): row is TimelineEntry & { - section: Extract - } => row.section.kind === `user_message` - ) - const spawnTime = firstMessage?.section.timestamp ?? null + const spawnTime = useMemo(() => { + for (const row of rows) { + if (!row.inbox) continue + const timestamp = Date.parse(row.inbox.timestamp) + return Number.isFinite(timestamp) ? timestamp : null + } + return null + }, [rows]) const lastStreamingAgentKey = useMemo(() => { for (let index = rows.length - 1; index >= 0; index--) { const row = rows[index] - if (row.section.kind === `agent_response`) { - return row.section.done ? null : row.key + if (row.run) { + return row.run.status === `started` ? row.$key : null } } return null @@ -822,9 +818,10 @@ export function EntityTimeline({ count: rows.length, getScrollElement: () => viewport, estimateSize: (index) => - cachedSizeMapRef.current.get(rows[index]?.key ?? ``) ?? - estimateRowHeight(rows[index], textColumnWidth), - getItemKey: (index) => rows[index]?.key ?? index, + cachedSizeMapRef.current.get( + rows[index] ? renderRowKey(rows[index]!) : `` + ) ?? estimateRowHeight(rows[index], textColumnWidth), + getItemKey: (index) => (rows[index] ? renderRowKey(rows[index]!) : index), gap: 0, overscan: 6, measureElement: measureRowElement, @@ -845,12 +842,13 @@ export function EntityTimeline({ if (!query.trim()) return matches rows.forEach((row, rowIndex) => { + const rowKey = renderRowKey(row) const text = timelineRowSearchText(row) const starts = getTextMatchStarts(text, query) starts.forEach((start, rowOccurrence) => { matches.push({ - id: `${row.key}:${rowOccurrence}`, - rowKey: row.key, + id: `${rowKey}:${rowOccurrence}`, + rowKey, rowIndex, rowOccurrence, label: timelineRowLabel(row), @@ -884,6 +882,17 @@ export function EntityTimeline({ rowVirtualizer.shouldAdjustScrollPositionOnItemSizeChange = () => false }, [rowVirtualizer]) + const scrollToTimelineEnd = useCallback(() => { + if (!viewport || rows.length === 0) return + rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + + // The stopped/status footer sits outside the virtual list, so make sure the + // physical scroll container is also flush with its full content height. + requestAnimationFrame(() => { + viewport.scrollTop = viewport.scrollHeight + }) + }, [rowVirtualizer, rows.length, viewport]) + const scrollAreaRef = useCallback((node: HTMLDivElement | null) => { setViewport(node) }, []) @@ -1041,11 +1050,46 @@ export function EntityTimeline({ if (!isNearBottom.current) return const frame = requestAnimationFrame(() => { - rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + scrollToTimelineEnd() + }) + + return () => cancelAnimationFrame(frame) + }, [rows, scrollToTimelineEnd, viewport]) + + useLayoutEffect(() => { + if (!contentElement || !viewport) return + + let frame: ReturnType | null = null + const pinToBottom = () => { + if (!isNearBottom.current) return + if (frame !== null) cancelAnimationFrame(frame) + frame = requestAnimationFrame(() => { + frame = null + scrollToTimelineEnd() + }) + } + + const observer = new ResizeObserver(pinToBottom) + observer.observe(contentElement) + return () => { + observer.disconnect() + if (frame !== null) cancelAnimationFrame(frame) + } + }, [contentElement, scrollToTimelineEnd, viewport]) + + useLayoutEffect(() => { + const previousStreamingAgentKey = previousStreamingAgentKeyRef.current + previousStreamingAgentKeyRef.current = lastStreamingAgentKey + if (!previousStreamingAgentKey || lastStreamingAgentKey) return + + isNearBottom.current = true + setShowJumpToBottom(false) + const frame = requestAnimationFrame(() => { + scrollToTimelineEnd() }) return () => cancelAnimationFrame(frame) - }, [rowVirtualizer, rows, viewport]) + }, [lastStreamingAgentKey, scrollToTimelineEnd]) useLayoutEffect(() => { if (handledScrollSignalRef.current === scrollToBottomSignal) return @@ -1055,11 +1099,11 @@ export function EntityTimeline({ if (!viewport || rows.length === 0) return const frame = requestAnimationFrame(() => { - rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + scrollToTimelineEnd() }) return () => cancelAnimationFrame(frame) - }, [rowVirtualizer, rows.length, scrollToBottomSignal, viewport]) + }, [rows.length, scrollToBottomSignal, scrollToTimelineEnd, viewport]) useEffect( () => () => { @@ -1074,9 +1118,9 @@ export function EntityTimeline({ if (rows.length > 0) { isNearBottom.current = true setShowJumpToBottom(false) - rowVirtualizer.scrollToIndex(rows.length - 1, { align: `end` }) + scrollToTimelineEnd() } - }, [rowVirtualizer, rows.length]) + }, [rows.length, scrollToTimelineEnd]) if (loading) { return ( @@ -1152,6 +1196,7 @@ export function EntityTimeline({ > {rowVirtualizer.getVirtualItems().map((virtualRow) => { const row = rows[virtualRow.index] + const rowKey = renderRowKey(row) // Stable row key. The previous implementation appended // `:${contentWidth}` to force remount on every column-width @@ -1167,8 +1212,8 @@ export function EntityTimeline({ key={virtualRow.key} ref={rowVirtualizer.measureElement} data-index={virtualRow.index} - data-item-key={row.key} - data-pane-find-row-key={row.key} + data-item-key={rowKey} + data-pane-find-row-key={rowKey} className={styles.virtualRow} style={{ transform: `translateY(${virtualRow.start}px)`, @@ -1176,10 +1221,9 @@ export function EntityTimeline({ }} > - >(() => new Map()) - const inlineTimeoutsRef = useRef( - new Map>() - ) - const processedInboxKeys = useMemo( + const optimisticInlineInboxKeys = useMemo( () => new Set( - entries - .filter((entry) => entry.section.kind === `user_message`) - .map((entry) => entry.key.replace(/^inbox:/, ``)) + timelineRows + .filter((row) => row.inbox?.status === `pending`) + .map((row) => row.inbox!.key) ), - [entries] - ) - const pendingInboxByKey = useMemo( - () => new Map(pendingInbox.map((message) => [message.key, message])), - [pendingInbox] + [timelineRows] ) - const projectedPendingMessage = useMemo(() => { - for (const [key, message] of inlineQueuedMessages) { - if (processedInboxKeys.has(key)) continue - return pendingInboxByKey.get(key) ?? message - } - return null - }, [inlineQueuedMessages, pendingInboxByKey, processedInboxKeys]) const visiblePendingInbox = useMemo( () => - projectedPendingMessage - ? pendingInbox.filter( - (message) => message.key !== projectedPendingMessage.key - ) - : pendingInbox, - [pendingInbox, projectedPendingMessage] - ) - const visibleEntries = useMemo>(() => { - if (!projectedPendingMessage) return entries - const timestamp = Date.parse(projectedPendingMessage.timestamp) - const hasUserMessage = entries.some( - (entry) => entry.section.kind === `user_message` - ) - return [ - ...entries, - { - key: `pending-inbox:${projectedPendingMessage.key}`, - order: Number.MAX_SAFE_INTEGER, - responseTimestamp: null, - section: { - kind: `user_message`, - from: projectedPendingMessage.from ?? `user`, - text: readTextPayload(projectedPendingMessage.payload), - timestamp: Number.isFinite(timestamp) ? timestamp : Date.now(), - isInitial: !hasUserMessage, - }, - }, - ] - }, [entries, projectedPendingMessage]) - - const rememberInlineQueuedMessage = useCallback( - (message: OptimisticInboxMessage) => { - setInlineQueuedMessages((current) => { - const next = new Map(current) - next.set(message.key, message) - return next - }) - const existingTimeout = inlineTimeoutsRef.current.get(message.key) - if (existingTimeout) clearTimeout(existingTimeout) - const timeout = setTimeout(() => { - inlineTimeoutsRef.current.delete(message.key) - setInlineQueuedMessages((current) => { - if (!current.has(message.key)) return current - const next = new Map(current) - next.delete(message.key) - return next - }) - }, INLINE_QUEUED_TIMEOUT_MS) - inlineTimeoutsRef.current.set(message.key, timeout) - }, - [] + pendingInbox.filter( + (message) => !optimisticInlineInboxKeys.has(message.key) + ), + [optimisticInlineInboxKeys, pendingInbox] ) - - useEffect(() => { - setInlineQueuedMessages((current) => { - let next: Map | null = null - for (const key of current.keys()) { - if (!processedInboxKeys.has(key)) continue - next ??= new Map(current) - next.delete(key) - const timeout = inlineTimeoutsRef.current.get(key) - if (timeout) clearTimeout(timeout) - inlineTimeoutsRef.current.delete(key) - } - return next ?? current - }) - }, [processedInboxKeys]) - - useEffect( - () => () => { - for (const timeout of inlineTimeoutsRef.current.values()) { - clearTimeout(timeout) - } - inlineTimeoutsRef.current.clear() - }, - [] + const inlinePendingInbox = + !entityStopped && !generationActive ? visiblePendingInbox[0] : undefined + const timelineRowsWithInlinePending = useMemo>( + () => + inlinePendingInbox + ? [ + ...timelineRows, + { + $key: `inbox:${inlinePendingInbox.key}`, + inbox: inlinePendingInbox, + } as EntityTimelineQueryRow, + ] + : timelineRows, + [inlinePendingInbox, timelineRows] ) + const drawerPendingInbox = inlinePendingInbox + ? visiblePendingInbox.slice(1) + : visiblePendingInbox // If the timeline subscription errors out for an entity that isn't // currently spawning (so the failure isn't transient), bounce back to @@ -189,7 +114,7 @@ function GenericChatBody({ return ( <> ( - pendingInbox: EntityTimelineData[`inbox`] + timelineRows: Array + pendingInbox: Array entities: Array generationActive: boolean db: EntityStreamDBWithActions | null @@ -76,7 +74,10 @@ export function useEntityTimeline( }, [baseUrl, entityUrl]) const { data: timelineRows = [] } = useLiveQuery( - (q) => (db ? createEntityIncludesQuery(db)(q) : undefined), + (q) => { + if (!db) return undefined + return createEntityTimelineQuery(db)(q) + }, [db] ) const { data: manifests = [] } = useLiveQuery( @@ -94,62 +95,14 @@ export function useEntityTimeline( ? q .from({ inbox: db.collections.inbox }) .where(({ inbox }) => eq(inbox.status, `pending`)) - .orderBy(({ inbox }) => inbox._seq, `asc`) + .orderBy(({ inbox }) => coalesce(inbox._timeline_order, `~`), `asc`) + .orderBy(({ inbox }) => + coalesce(inbox._seq, Number.MAX_SAFE_INTEGER) + ) : undefined, [db] ) - const timelineData = useMemo( - () => - normalizeEntityTimelineData( - (timelineRows as Array)[0] ?? { - runs: [], - inbox: [], - wakes: [], - contextInserted: [], - contextRemoved: [], - entities: [], - } - ), - [timelineRows] - ) - - const entries = useMemo(() => { - const baseEntries = buildTimelineEntries( - timelineData.runs, - timelineData.inbox, - timelineData.wakes - ) - const orderByKey = new Map() - for (const run of timelineData.runs) { - orderByKey.set(`run:${run.key}`, run.order) - } - for (const msg of timelineData.inbox) { - orderByKey.set(`inbox:${msg.key}`, msg.order) - } - for (const wake of timelineData.wakes) { - orderByKey.set(`wake:${wake.key}`, wake.order) - } - - const merged: Array<{ order: string | number; entry: TimelineEntry }> = [ - ...baseEntries.map((entry) => ({ - order: orderByKey.get(entry.key) ?? Number.MAX_SAFE_INTEGER, - entry, - })), - ...(manifests as Array).map((manifest) => ({ - order: manifest._seq ?? Number.MAX_SAFE_INTEGER, - entry: { - key: `manifest:${manifest.key}`, - order: manifest._seq ?? Number.MAX_SAFE_INTEGER, - responseTimestamp: null, - section: { kind: `manifest` as const, manifest }, - }, - })), - ] - - return merged - .sort((left, right) => compareTimelineOrders(left.order, right.order)) - .map(({ entry }) => entry) - }, [manifests, timelineData.runs, timelineData.inbox, timelineData.wakes]) + const typedTimelineRows = timelineRows as Array const pendingInbox = useMemo( () => @@ -157,7 +110,7 @@ export function useEntityTimeline( .map( (msg): IncludesInboxMessage => ({ key: msg.key, - order: msg._seq ?? Number.MAX_SAFE_INTEGER, + order: msg._timeline_order ?? msg._seq ?? Number.MAX_SAFE_INTEGER, from: msg.from, payload: msg.payload, timestamp: msg.timestamp, @@ -191,14 +144,44 @@ export function useEntityTimeline( [pendingInboxRows] ) const generationActive = useMemo( - () => timelineData.runs.some((run) => run.status === `started`), - [timelineData.runs] + () => typedTimelineRows.some((row) => row.run?.status === `started`), + [typedTimelineRows] + ) + const entities = useMemo( + () => + normalizeTimelineEntities( + (manifests as Array) + .filter( + (manifest) => + manifest.kind === `child` || manifest.kind === `source` + ) + .map( + (manifest): IncludesEntity => ({ + key: + manifest.kind === `child` + ? manifest.entity_url + : manifest.sourceRef, + kind: manifest.kind, + id: manifest.kind === `child` ? manifest.id : manifest.sourceRef, + url: + manifest.kind === `child` + ? manifest.entity_url + : manifest.sourceRef, + type: + manifest.kind === `child` ? manifest.entity_type : undefined, + observed: + manifest.kind === `source` || Boolean(manifest.observed), + wake: manifest.wake, + }) + ) + ), + [manifests] ) return { - entries, + timelineRows: typedTimelineRows, pendingInbox, - entities: timelineData.entities, + entities, generationActive, db, loading, diff --git a/packages/agents-server-ui/src/lib/sendMessage.ts b/packages/agents-server-ui/src/lib/sendMessage.ts index 018d7152fe..3690c87cda 100644 --- a/packages/agents-server-ui/src/lib/sendMessage.ts +++ b/packages/agents-server-ui/src/lib/sendMessage.ts @@ -1,5 +1,6 @@ import { createOptimisticAction } from '@tanstack/db' import { generateKeyBetween } from 'fractional-indexing' +import { createPendingTimelineOrder } from '@electric-ax/agents-runtime/client' import { getActivePrincipal, getConfiguredServerHeaders, @@ -9,16 +10,15 @@ import { entityApiUrl } from './entity-api' import { loadCloudAuthState } from './server-connection' import type { EntityStreamDBWithActions } from '@electric-ax/agents-runtime/client' -// Timeline queries sort inbox messages by `_seq`. Pending local rows do not -// have a server sequence yet, so put them after streamed rows until the real -// event with the same key arrives. -const OPTIMISTIC_INBOX_SEQ_START = Number.MAX_SAFE_INTEGER - 1_000_000 +// Pending local rows do not have a server stream offset yet, so put them after +// streamed rows until the real event with the same key arrives. +const OPTIMISTIC_INBOX_ORDER_START = Number.MAX_SAFE_INTEGER - 1_000_000 -let optimisticInboxSeq = OPTIMISTIC_INBOX_SEQ_START +let optimisticInboxOrderIndex = OPTIMISTIC_INBOX_ORDER_START export type OptimisticInboxMessage = { key: string - _seq: number + _timeline_order: string from: string payload: { text: string } timestamp: string @@ -32,7 +32,7 @@ type SendMessageInput = { text: string mode: `immediate` | `queued` | `paused` | `steer` key: string - seq: number + pendingOrderIndex: number position?: string } @@ -48,16 +48,16 @@ type InboxMessageKeyInput = { key: string } -function createOptimisticInboxKey(seq: number): string { - return `optimistic-${Date.now()}-${seq}` +function createOptimisticInboxKey(pendingOrderIndex: number): string { + return `optimistic-${Date.now()}-${pendingOrderIndex}` } -function nextOptimisticInboxSeq(): number { - optimisticInboxSeq += 1 - if (optimisticInboxSeq >= Number.MAX_SAFE_INTEGER) { - optimisticInboxSeq = OPTIMISTIC_INBOX_SEQ_START +function nextOptimisticInboxOrderIndex(): number { + optimisticInboxOrderIndex += 1 + if (optimisticInboxOrderIndex >= Number.MAX_SAFE_INTEGER) { + optimisticInboxOrderIndex = OPTIMISTIC_INBOX_ORDER_START } - return optimisticInboxSeq + return optimisticInboxOrderIndex } const QUEUE_POSITION_TIMESTAMP_WIDTH = 16 @@ -190,11 +190,11 @@ export function createSendMessageAction({ onOptimisticMessage?: (message: OptimisticInboxMessage) => void }) { const action = createOptimisticAction({ - onMutate: ({ text, mode, key, seq, position }) => { + onMutate: ({ text, mode, key, pendingOrderIndex, position }) => { const now = new Date().toISOString() const message: OptimisticInboxMessage = { key, - _seq: seq, + _timeline_order: createPendingTimelineOrder(pendingOrderIndex), from, payload: { text }, timestamp: now, @@ -239,7 +239,7 @@ export function createSendMessageAction({ mode?: `immediate` | `queued` | `paused` | `steer` position?: string }) => { - const seq = nextOptimisticInboxSeq() + const pendingOrderIndex = nextOptimisticInboxOrderIndex() const effectivePosition = position ?? (mode === `queued` || mode === `paused` @@ -248,8 +248,8 @@ export function createSendMessageAction({ return action({ text, mode, - key: createOptimisticInboxKey(seq), - seq, + key: createOptimisticInboxKey(pendingOrderIndex), + pendingOrderIndex, position: effectivePosition, }) } diff --git a/packages/agents-server-ui/src/main.tsx b/packages/agents-server-ui/src/main.tsx index dc494a5c75..a64d9001bc 100644 --- a/packages/agents-server-ui/src/main.tsx +++ b/packages/agents-server-ui/src/main.tsx @@ -28,12 +28,31 @@ import { App } from './App' // to non-ngrok hosts. Covers the durable-streams client's internal fetches // too, since it calls through the global fetch. const originalFetch = window.fetch.bind(window) +const shouldSkipNgrokWarning = (input: RequestInfo | URL): boolean => { + const url = + input instanceof Request + ? input.url + : input instanceof URL + ? input.href + : input + try { + return new URL(url, window.location.href).hostname.endsWith( + `.ngrok-free.app` + ) + } catch { + return false + } +} + window.fetch = ( input: RequestInfo | URL, init?: RequestInit ): Promise => { const headers = new Headers(init?.headers ?? {}) - if (!headers.has(`ngrok-skip-browser-warning`)) { + if ( + shouldSkipNgrokWarning(input) && + !headers.has(`ngrok-skip-browser-warning`) + ) { headers.set(`ngrok-skip-browser-warning`, `true`) } return originalFetch(input, { ...init, headers }) diff --git a/packages/agents-server-ui/vite.config.ts b/packages/agents-server-ui/vite.config.ts index f4635c0248..0287834746 100644 --- a/packages/agents-server-ui/vite.config.ts +++ b/packages/agents-server-ui/vite.config.ts @@ -51,9 +51,10 @@ export default defineConfig(({ command, mode }) => { `jsx-dev-runtime.js` ), }, - dedupe: [`react`, `react-dom`], + dedupe: [`react`, `react-dom`, `@tanstack/db`], }, optimizeDeps: { + exclude: [`@durable-streams/state`, `@tanstack/db`, `@tanstack/react-db`], include: [ `react`, `react-dom`,