diff --git a/src/content/docs/agents/communication-channels/chat/chat-agents.mdx b/src/content/docs/agents/communication-channels/chat/chat-agents.mdx index 215ff783b65..b7b7be84537 100644 --- a/src/content/docs/agents/communication-channels/chat/chat-agents.mdx +++ b/src/content/docs/agents/communication-channels/chat/chat-agents.mdx @@ -869,17 +869,18 @@ function Chat() { ### Options -| Option | Type | Default | Description | -| ----------------------------- | --------------------------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------ | -| `agent` | `ReturnType` | Required | Agent connection from `useAgent` | -| `onToolCall` | `({ toolCall, addToolOutput }) => void` | — | Handle client-side tool execution | -| `tools` | `Record` | — | Advanced: dynamically register client-executed tools from the browser | -| `autoContinueAfterToolResult` | `boolean` | `true` | Auto-continue conversation after client tool results and approvals | -| `resume` | `boolean` | `true` | Enable automatic stream resumption on reconnect | -| `cancelOnClientAbort` | `boolean` | `false` | Cancel the server turn when generic client stream abort or cleanup occurs | -| `body` | `object \| () => object` | — | Custom data sent with every request | -| `prepareSendMessagesRequest` | `(options) => { body?, headers? }` | — | Advanced per-request customization | -| `getInitialMessages` | `(options) => Promise` or `null` | — | Custom initial message loader. Set to `null` to skip the HTTP fetch entirely (useful when providing `messages` directly) | +| Option | Type | Default | Description | +| ----------------------------- | --------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `agent` | `ReturnType` | Required | Agent connection from `useAgent` | +| `onToolCall` | `({ toolCall, addToolOutput }) => void` | — | Handle client-side tool execution | +| `tools` | `Record` | — | Advanced: dynamically register client-executed tools from the browser | +| `autoContinueAfterToolResult` | `boolean` | `true` | Auto-continue conversation after client tool results and approvals | +| `resume` | `boolean` | `true` | Enable automatic stream resumption on reconnect | +| `cancelOnClientAbort` | `boolean` | `false` | Cancel the server turn when generic client stream abort or cleanup occurs | +| `body` | `object \| () => object` | — | Custom data sent with every request | +| `prepareSendMessagesRequest` | `(options) => { body?, headers? }` | — | Advanced per-request customization | +| `getInitialMessages` | `(options) => Promise` or `null` | — | Custom initial message loader. Set to `null` to skip the HTTP fetch entirely (useful when providing `messages` directly) | +| `syncMessagesToServer` | `boolean` | `true` | When `true`, `setMessages` pushes the transcript to the server. Set to `false` for hosts with server-authoritative transcript storage so `setMessages` updates the local view only | ### Return values diff --git a/src/content/docs/agents/harnesses/think/actions.mdx b/src/content/docs/agents/harnesses/think/actions.mdx new file mode 100644 index 00000000000..0a044678be7 --- /dev/null +++ b/src/content/docs/agents/harnesses/think/actions.mdx @@ -0,0 +1,273 @@ +--- +title: Actions +pcx_content_type: how-to +description: Server-side Think tools with idempotency, human approvals, authorization, and reply attachments. +sidebar: + order: 4.5 +products: + - agents +--- + +import { TypeScriptExample } from "~/components"; + +:::note[Experimental] +The Actions API surface may evolve before Think graduates out of experimental. +::: + +Actions are server-side tools with batteries included. Where a plain AI SDK `tool()` is just a description, a schema, and an `execute` function, an `action()` adds the things that are tedious and dangerous to get right by hand for a tool that has real side effects: + +- **Idempotency** — a durable ledger replays a settled result by a stable key instead of re-running the side effect on a recovery retry. +- **Approvals** — gate a call behind a human, either inline (the turn waits) or durably (the turn parks and resumes later, even from a dashboard with no live socket). +- **Authorization** — declare the permissions a call requires and grant them per-turn. +- **Reply attachments** — record advisory delivery metadata (a drafted email, a card, a voice note) without changing what the model sees. + +Actions compile into Think tools, so the model calls them exactly like any other tool. Return them from `getActions()`; Think merges them into the tool set alongside `getTools()`, workspace tools, extensions, and MCP tools. + +## Define an action + +Use the `action()` descriptor factory and return a map of actions from `getActions()`. The map key is the tool name the model sees (unless you set `name`). The `execute` input type is inferred from `inputSchema`: + + + +```ts +import { Think, action } from "@cloudflare/think"; +import { z } from "zod"; + +export class Support extends Think { + getActions() { + return { + refundOrder: action({ + description: "Refund a customer order.", + inputSchema: z.object({ + orderId: z.string(), + amountCents: z.number().int().positive(), + }), + execute: async ({ orderId, amountCents }, ctx) => { + const result = await refund(orderId, amountCents); + return { refundId: result.id, status: result.status }; + }, + }), + }; + } +} +``` + + + +The `execute` callback receives the validated input and an `ActionContext`: + +```ts +type ActionContext = { + agent: Think; + env: Cloudflare.Env; + requestId: string; + toolCallId: string; + messages: ReadonlyArray; + signal: AbortSignal; // aborts on turn cancel or after `timeoutMs` + attachReply(attachment: ReplyAttachment): void; +}; +``` + +The action output is normalized to JSON and truncated before it is shown to the model (long outputs are capped). Anything thrown from `execute` becomes a structured `{ error: { name, message } }` tool result rather than crashing the turn. Each action has a default timeout of 30 seconds; override it per action with `timeoutMs`. + +### Actions versus plain tools + +A plain `tool()` from `getTools()` still works and is the right choice for a read-only or trivial tool. Reach for `action()` when a tool has side effects you must not run twice, needs human approval, or needs declarative authorization — the ledger, approval descriptors, default timeout, and structured error mapping only apply to actions. + +## Idempotency and the action ledger + +When an action declares an `idempotencyKey`, Think records the settled result in a durable ledger keyed by `action::`. If the same key is seen again — on a recovery retry, a reconnect, or a duplicate inbound event — Think returns the stored result **without** re-running `execute`, so the side effect happens at most once on the happy path. + + + +```ts +const chargeInvoice = action({ + description: "Charge an invoice.", + inputSchema: z.object({ invoiceId: z.string() }), + // Use a stable domain identifier — never a timestamp, request id, or random value. + idempotencyKey: ({ input }) => `invoice:${input.invoiceId}`, + execute: async ({ invoiceId }) => charge(invoiceId), +}); +``` + + + +`idempotencyKey` is a string, or a function `({ input, ctx }) => string`. Choose a key that survives recovery retries — an order id, an inbound event id — and not a value that changes per attempt. An action with no `idempotencyKey` falls back to a per-`toolCallId` key, which only deduplicates within the same tool call, not across retries. + +### Pending rows and the retry lease + +A ledger row is written as `pending` before `execute` runs and flipped to `settled` on success (a thrown or timed-out `execute` deletes the row so the call can be retried cleanly). If the isolate dies mid-execute, the row is left `pending`. By default such a stale row is reclaimed and the action re-run once the row is older than `actionLedgerPendingRetryLeaseMs` (default 5 minutes) — **but only for actions that declare an explicit `idempotencyKey`**, since that key is your assertion that re-running the keyed side effect is safe. A fresh pending row (or one without an explicit key) instead returns an `ActionPendingError` result so the model does not blindly retry an unknown state. Set `actionLedgerPendingRetryLeaseMs = false` to disable reclaim entirely and always surface `ActionPendingError` for a stale row. + +## Approvals + +Gate an action behind a human with `approval`. There are two mechanisms, selected by `kind`. + +### Approval-gated (the turn waits) + +The default when you set `approval` without a `kind`. The action compiles to a tool with the AI SDK `needsApproval` flag: the stream pauses with an `approval-requested` part, the client approves or rejects, and the turn continues inline. `execute` runs only after approval. + + + +```ts +const deleteAccount = action({ + description: "Permanently delete a user account.", + inputSchema: z.object({ userId: z.string() }), + approval: true, // or ({ input }) => input.userId !== currentUser + approvalSummary: "Delete an account", + approvalRisk: "high", + execute: async ({ userId }) => deleteAccount(userId), +}); +``` + + + +`approval` is a boolean or a predicate `({ input, ctx }) => boolean`, so you can require approval only for risky inputs. `approvalSummary` and `approvalRisk` (`"low" | "medium" | "high"`) populate the approval descriptor your UI renders. + +### Durable-pause (the turn parks and resumes later) + +Set `kind: "durable-pause"` when approval may take minutes or days and you do not want to hold a connection open. The action parks into a durable store and the turn ends; `execute` does not run yet. Resume later — from anywhere, including a dashboard with no live WebSocket — with `approveExecution()` or `rejectExecution()`: + + + +```ts +const deploy = action({ + description: "Deploy to production.", + inputSchema: z.object({ ref: z.string() }), + kind: "durable-pause", + approvalSummary: "Deploy to production", + approvalRisk: "high", + permissions: ["deploy:run"], + execute: async ({ ref }) => deploy(ref), +}); +``` + + + + + +```ts +// List everything waiting on a human (cold-load reconciliation): +const pending = await agent.pendingApprovals(); +// [{ executionId, source: "action" | "codemode", descriptor }] + +// Approve or reject by execution id (idempotent — a second call is a no-op): +await agent.approveExecution(executionId); +await agent.rejectExecution(executionId, "Not this release"); +``` + + + +`approveExecution()` runs `execute` once and auto-continues the turn even if no client is connected; `rejectExecution()` resolves the action without running it. `pendingApprovals()` merges parked actions and paused [Codemode](/agents/tools/codemode/) executions, so a single approval UI can drive both. (`durable-pause` requires an `approval` policy — an action that would never park is rejected at definition time.) + +Both approval-gated and durable-pause parts carry a stable `ActionApprovalDescriptor` (`{ requestId, toolCallId, action, summary, input, permissions, risk, kind }`) so your UI has everything it needs to render the prompt. + +## Authorization + +Declare the permissions an action requires with `permissions`, then grant them per turn. By default every turn is fully authorized, so authorization is opt-in. + + + +```ts +const refundOrder = action({ + description: "Refund a customer order.", + inputSchema: z.object({ orderId: z.string() }), + permissions: ["billing:refund"], // or ({ input }) => [...] + execute: async ({ orderId }) => refund(orderId), +}); +``` + + + +Override `authorizeTurn()` to decide, once per turn, which permissions are granted. Returning a list narrows the grant; any action requiring a permission outside the set is denied with a structured `ActionAuthorizationError` (the model never calls `execute`): + + + +```ts +export class Support extends Think { + override authorizeTurn(ctx: TurnContext): ActionAuthorizationDecision { + const role = (ctx.body as { role?: string })?.role; + if (role === "admin") return true; // full grant (the default) + return { allowed: true, grantedPermissions: ["billing:read"] }; + } +} +``` + + + +`authorizeTurn()` returns `true` (full grant), `false` (deny all), or `{ allowed, reason?, grantedPermissions? }`. For per-call logic, override `authorizeAction(ctx)` instead — it receives the action name, kind, input, and required and granted permissions. + +## Reply attachments + +An action can record advisory delivery metadata for the turn — a drafted email, a card, a voice note — with `ctx.attachReply()`. Attachments never change the tool output the model sees; they ride alongside the response for your delivery layer to render. + + + +```ts +const draftReply = action({ + description: "Draft an email reply.", + inputSchema: z.object({ to: z.string(), subject: z.string() }), + execute: async ({ to, subject }, ctx) => { + ctx.attachReply({ type: "email_draft", to: [to], subject }); + return { drafted: true }; + }, +}); +``` + + + +Read the attachments after the turn from the `onChatResponse()` hook, or from the `replyAttachments(requestId?)` getter: + + + +```ts +export class Support extends Think { + override async onChatResponse(result: ChatResponseResult) { + for (const attachment of result.attachments ?? []) { + // attachment.type === "email_draft" | "card" | "voice_note" | custom + } + } +} +``` + + + +Attachments are JSON-normalized and deep-copied on read, capped per turn, and discarded if the `execute` that recorded them fails. A ledger replay does not re-fire attachments (the side effect already happened), and `attachReply()` is a no-op when called from a `permissions`, `approval`, or `idempotencyKey` callback — record attachments from `execute`. + +A built-in `ReplyAttachment` covers `email_draft`, `card`, and `voice_note`; any `{ type: string; ... }` shape is allowed for custom delivery. Override [`renderAttachment()`](/agents/harnesses/think/channels/#deliver-out-of-band) to turn an attachment into a channel notice. + +## Reference + +### `action(config)` + +| Field | Type | Required | Default | Description | +| ----------------- | ------------------------------------------------------------ | -------- | ------------- | --------------------------------------------------------------------------------------- | +| `description` | `string` | Yes | — | Tool description shown to the model. | +| `inputSchema` | `FlexibleSchema` (Zod or AI SDK `jsonSchema`) | Yes | — | Validates and types the `execute` input. | +| `execute` | `(input, ctx) => Output \| Promise` | Yes | — | The action body. Receives validated input and an `ActionContext`. | +| `name` | `string` | No | map key | Overrides the tool name. | +| `idempotencyKey` | `string \| ({ input, ctx }) => string` | No | per tool call | Stable key for ledger replay. Use a domain identifier. | +| `permissions` | `readonly string[] \| ({ input, ctx }) => readonly string[]` | No | none | Permissions this call requires (see Authorization). | +| `approval` | `boolean \| ({ input, ctx }) => boolean` | No | none | Gate the call behind a human. | +| `approvalSummary` | `string` | No | `description` | Human-readable summary in the approval descriptor. | +| `approvalRisk` | `"low" \| "medium" \| "high"` | No | — | Risk hint in the approval descriptor. | +| `kind` | `"server" \| "approval-gated" \| "durable-pause"` | No | inferred | `approval-gated` when `approval` is set, else `server`; set `durable-pause` explicitly. | +| `timeoutMs` | `number` | No | `30000` | Per-action execution timeout (also drives `ctx.signal`). | + +### Hooks and methods on the agent + +| Member | Description | +| --------------------------------------- | ---------------------------------------------------------------------------------- | +| `getActions()` | Return the action descriptors to compile into tools. | +| `authorizeTurn(ctx)` | Decide granted permissions once per turn. Defaults to full grant. | +| `authorizeAction(ctx)` | Decide authorization per action call. Defaults to checking `authorizeTurn` grants. | +| `pendingApprovals(executionId?)` | List parked actions and paused Codemode executions awaiting approval. | +| `approveExecution(executionId)` | Approve a parked execution; runs `execute` and auto-continues the turn. | +| `rejectExecution(executionId, reason?)` | Reject a parked execution without running it. | +| `replyAttachments(requestId?)` | Read the advisory attachments recorded during a turn. | +| `actionLedgerPendingRetryLeaseMs` | Stale-pending reclaim window (default `300000`; `false` to disable). | + +## Related + +- [Tools](/agents/harnesses/think/tools/) — workspace tools, code execution, and extensions. +- [Human in the loop](/agents/concepts/agentic-patterns/human-in-the-loop/) — the approval flow end to end. +- [Channels](/agents/harnesses/think/channels/) — deliver attachments and out-of-band notices. diff --git a/src/content/docs/agents/harnesses/think/channels.mdx b/src/content/docs/agents/harnesses/think/channels.mdx new file mode 100644 index 00000000000..5667b653a6f --- /dev/null +++ b/src/content/docs/agents/harnesses/think/channels.mdx @@ -0,0 +1,186 @@ +--- +title: Channels +pcx_content_type: how-to +description: Apply per-channel policy, select a channel on a turn, and deliver out-of-band notices across web, messenger, voice, and custom surfaces. +sidebar: + order: 7.5 +products: + - agents +--- + +import { TypeScriptExample } from "~/components"; + +:::note[Experimental] +The Channels API surface may evolve before Think graduates out of experimental. +::: + +A channel is a surface a Think agent talks over: the browser WebSocket, a messenger webhook (Telegram, Slack, and so on), voice, or your own custom transport. Channels generalize [messengers](/agents/harnesses/think/messengers/) into one vocabulary so you can apply per-channel policy (a different system prompt, a narrowed tool set, a step cap) and deliver out-of-band notices, regardless of the surface a turn arrived on. + +Every Think agent always has an implicit `web` channel (the WebSocket chat your browser clients use). You declare additional channels — and override the `web` policy — with `configureChannels()`. Messengers returned from `getMessengers()` are automatically absorbed as `messenger` channels, so existing messenger apps keep working unchanged. + +## Configure channels + +Override `configureChannels()` to return a map of channel id to `ChannelDefinition`. The id is how you select the channel on a turn: + + + +```ts +import { Think, messengerChannel } from "@cloudflare/think"; +import { telegram } from "@chat-adapter/telegram"; + +export class Assistant extends Think { + configureChannels() { + return { + // Override policy for the built-in web channel. + web: { + kind: "web", + ingress: { transport: "websocket" }, + instructions: "You are chatting in a web app. Use markdown freely.", + }, + // A voice channel with tighter limits. + voice: { + kind: "voice", + ingress: { transport: "voice" }, + instructions: "Keep replies short and speakable. No markdown.", + maxTurns: 3, + }, + // A messenger channel (Chat SDK webhook). + telegram: messengerChannel( + telegram({ + /* adapter config */ + }), + ), + }; + } +} +``` + + + +A `ChannelDefinition` has these fields: + +| Field | Type | Description | +| -------------- | ------------------------------------------------------------------- | ----------------------------------------------------------------------------------- | +| `kind` | `"web" \| "messenger" \| "voice" \| "custom"` | The surface category. | +| `ingress` | `{ transport: "websocket" \| "voice" }` or a webhook messenger spec | How turns arrive. `messengerChannel()` builds the webhook form for you. | +| `instructions` | `string \| (ctx: ChannelContext) => string \| Promise` | Prepended to the system prompt for turns on this channel. | +| `tools` | `(all: ToolSet) => ToolSet` | Narrow the assembled tool set for this channel (filter only — it cannot add tools). | +| `maxTurns` | `number` | Per-channel cap on model steps for a turn. | +| `capabilities` | `ChannelCapabilities` | Surface capabilities (streaming, message editing). Defaulted for `web`. | +| `conversation` | messenger conversation mode or resolver | Messenger thread routing (see [Messengers](/agents/harnesses/think/messengers/)). | +| `delivery` | channel delivery policy | Messenger delivery policy. | + +Use the `defineChannels()` helper for type inference, and `messengerChannel()` to wrap a Chat SDK adapter definition as a `kind: "messenger"` channel. + +### Channel kinds + +| Kind | Ingress | Notes | +| ----------- | --------------------------------- | --------------------------------------------------------------------------------------------- | +| `web` | `{ transport: "websocket" }` | Always present. Declare it in `configureChannels()` only to set policy; you cannot remove it. | +| `messenger` | webhook (`messengerChannel(...)`) | Fed into the messenger runtime. Equivalent to a `getMessengers()` entry. | +| `voice` | `{ transport: "voice" }` | Applies policy and turn context; out-of-band delivery is not yet wired. | +| `custom` | app-defined | For your own transport. Same delivery limitations as `voice` today. | + +## Per-channel policy + +Channel policy is applied as an **overridable default** before [`beforeTurn`](/agents/harnesses/think/lifecycle-hooks/) runs, so a `beforeTurn` override still wins: + +- `instructions` is prepended to the base system prompt for the turn. +- `tools` filters the assembled tool set (it can only remove tools — the `getTools()` seam adds them). +- `maxTurns` caps model steps: `beforeTurn`'s `maxSteps` wins, then the channel `maxTurns`, then the instance `maxSteps` default. + +## Select a channel on a turn + +Pass `channel` to [`runTurn()`](/agents/harnesses/think/#runturn) (or `chat()`) to run a turn on a specific channel. The channel id is stamped onto the user message, so a continued or recovered turn re-resolves the same channel and re-applies its policy: + + + +```ts +export class Assistant extends Think { + async speak() { + await this.runTurn({ input: "Read this out loud", channel: "voice" }); + } +} +``` + + + +Inside a turn, the active channel is available as `this.activeChannel` (a `ChannelContext` with `channelId`, `kind`, and messenger details when relevant). A turn with no `channel` runs without a channel context and applies no channel policy. + +## Deliver out of band + +`deliverNotice()` sends a message to a channel **without** starting a model turn. Use it for status updates ("your import finished") or to surface an action's [reply attachment](/agents/harnesses/think/actions/#reply-attachments) — it does not run inference, does not enter the turn queue, and is therefore safe to call from inside a tool's `execute`: + + + +```ts +export class Assistant extends Think { + async notify() { + await this.deliverNotice("Your export is ready to download."); + + await this.deliverNotice("Background research finished.", { + informModel: true, // also record it in the transcript so the next turn knows + }); + } +} +``` + + + +```ts +type DeliverNoticeOptions = { + channel?: string; // defaults to the active turn's channel, else "web" + informModel?: boolean; // also write to the model-visible transcript (default false) + kind?: "final" | "interim" | "notice" | "command"; // wire tag (default "notice") + thread?: string; // required for out-of-turn delivery to a multi-thread messenger +}; +``` + +Behavior depends on the target channel: + +- **`web`** — the notice is always appended to the transcript (that is its only render path). `informModel` then only controls the phrasing. +- **`messenger`** — the notice is posted to the provider. Out of turn, pass `thread` to target a conversation. With `informModel: true`, it is also written to the transcript. +- **`voice` / `custom`** — out-of-turn delivery throws, because these surfaces have no delivery target yet. + +Override `renderAttachment(attachment)` to turn an action reply attachment into a notice; Think calls it at the end of a turn and delivers the rendered text as a trailing `interim` notice. Return `undefined` to skip an attachment type. + +## Relationship to messengers + +`configureChannels()` wraps `getMessengers()` — it does not replace it. Each `getMessengers()` entry becomes a `kind: "messenger"` channel, and everything in the [Messengers](/agents/harnesses/think/messengers/) guide (Telegram setup, webhook routing, conversation targets, delivery and recovery) continues to apply. A channel id in `configureChannels()` that collides with a `getMessengers()` id is an error. Keep using `getMessengers()` for messenger-only apps; reach for `configureChannels()` when you also want `web`/`voice`/`custom` policy or out-of-band notices. + +## Observability + +Channel activity is reported on the `channel` observability channel: + + + +```ts +import { subscribe } from "agents/observability"; + +const unsubscribe = subscribe("channel", (event) => { + // event.type is one of: + // "channel:resolved" — a turn resolved a registered channel + // "channel:delivered" — a turn's final reply was delivered + // "notice:delivered" — deliverNotice() succeeded + // "notice:failed" — deliverNotice() threw +}); +``` + + + +## Reference + +| Member | Description | +| ------------------------------- | --------------------------------------------------------------------------- | +| `configureChannels()` | Return the channel map. Defaults to `{}` (the implicit `web` channel only). | +| `deliverNotice(text, options?)` | Send an out-of-band message to a channel with no model turn. | +| `activeChannel` | The `ChannelContext` for the in-flight turn, or `undefined`. | +| `renderAttachment(attachment)` | Map a reply attachment to channel notice text (or `undefined` to skip). | +| `defineChannels(channels)` | Identity helper for channel-map type inference. | +| `messengerChannel(definition)` | Wrap a Chat SDK adapter as a `kind: "messenger"` channel. | + +## Related + +- [Messengers](/agents/harnesses/think/messengers/) — Chat SDK webhook setup and delivery in depth. +- [Actions](/agents/harnesses/think/actions/) — record reply attachments for `renderAttachment()`. +- [Voice](/agents/communication-channels/voice/) — real-time speech surfaces. diff --git a/src/content/docs/agents/harnesses/think/index.mdx b/src/content/docs/agents/harnesses/think/index.mdx index 701f56467fe..dae797e8032 100644 --- a/src/content/docs/agents/harnesses/think/index.mdx +++ b/src/content/docs/agents/harnesses/think/index.mdx @@ -168,7 +168,74 @@ Both Think and [`AIChatAgent`](/agents/communication-channels/chat/chat-agents/) ## Choose a turn API -Think has several ways to start or continue a turn. Choose based on who starts the work and what the caller needs back. +Think has several ways to start or continue a turn. They all funnel through one public entry point — `runTurn(options)` — and the older methods remain as convenience shortcuts. + +### runTurn() + +:::note[Experimental] +`runTurn()` is stable in shape, but may evolve before Think graduates out of experimental. +::: + +`runTurn()` is the unified turn-admission API. One method, three modes, selected by `options.mode`: + +| Mode | Use when | Returns | Shortcut for | +| ------------------ | ------------------------------------------------------------ | ------------------------------- | ------------------ | +| `"wait"` (default) | The caller can block until the model response is finished | `Promise` | `saveMessages()` | +| `"submit"` | The caller needs fast, durable acceptance and a later status | `Promise` | `submitMessages()` | +| `"stream"` | The caller wants the response streamed to a callback (RPC) | `Promise` | `chat()` | + +The `input` accepts a string, a `UIMessage`, an array of messages, or — in `wait` and `stream` modes — a function `(current) => UIMessage[]` evaluated at admission. (`submit` does not accept function input.) + + + +```ts +export class Assistant extends Think { + async examples(inboundEventId: string) { + // wait — block for the result + const result = await this.runTurn({ input: "Summarize the latest thread" }); + if (result.status === "completed") { + // result.message is the assistant message; result.continuation is false + } + + // submit — durable acceptance, check status later + const submission = await this.runTurn({ + mode: "submit", + input: "Process this webhook", + idempotencyKey: inboundEventId, // dedupe; safe to retry + }); + // submission.accepted is true on first accept; submission.status is "pending" + + // stream — drive a callback (the same surface as chat()) + await this.runTurn({ + mode: "stream", + input: "Stream me", + callback: { + onStart({ requestId }) {}, + onEvent(json) {}, // UIMessageChunk JSON + onDone() {}, + onError(error) {}, + }, + }); + + // continuation — continue the last assistant turn instead of sending input + await this.runTurn({ continuation: true }); + } +} +``` + + + +Key behaviors: + +- **Blocking modes cannot nest.** Calling `wait`/`stream`/`continuation` (or the equivalent shortcut) from _inside_ an active turn — for example, from a tool's `execute` — throws, because it would deadlock the turn queue. From inside a turn, use `runTurn({ mode: "submit" })` (durable, runs after the current turn frees the queue) or [`addMessages()`](#add-messages-without-a-turn) (transcript only, no inference). +- **`submit` is idempotent.** Pass `submissionId` and/or `idempotencyKey`; re-submitting a known key returns the existing record with `accepted: false` instead of starting a second turn. See [Programmatic submissions](/agents/harnesses/think/programmatic-submissions/). +- **Recovery-safe.** When `chatRecovery` is enabled, the `wait`, `stream`, and drained `submit` paths all run inference inside a recovery fiber, so an interrupted turn resumes after eviction. + +`runTurn` is exported alongside its option and result types: `RunTurnOptions`, `RunTurnWait`, `RunTurnSubmit`, `RunTurnStream`, `TurnInputMessages`, and `TurnResult`. + +### Pick a shortcut + +The table below maps each scenario to the most direct call. Each shortcut has an unchanged signature; reach for them when you want the narrower surface, or use `runTurn()` when you want one mental model. | Use case | API | | -------------------------------------------------------------- | ----------------------------------------------- | @@ -180,11 +247,41 @@ Think has several ways to start or continue a turn. Choose based on who starts t | A parent delegates work to a retained child agent | `agentTool()` or `runAgentTool()` | | Surround a turn with idempotent app-owned side effects | `startFiber()` | | Coordinate multi-step durable orchestration | Workflows | -| Add context or messages without starting a model turn | `persistMessages()` | +| Add context or messages without starting a model turn | `addMessages()` | | Advanced subclass or recovery code continues an assistant turn | `continueLastTurn()` | Use `saveMessages()` when the caller owns the trigger and can wait for the turn to finish. Use [`submitMessages()`](/agents/harnesses/think/programmatic-submissions/) when timeout ambiguity would make retries unsafe. +### Add messages without a turn + +Use `addMessages()` to write to the transcript **without** starting a model turn — for importing prior history or injecting background context the next turn should see: + + + +```ts +export class Assistant extends Think { + async importContext() { + await this.addMessages([ + { + id: crypto.randomUUID(), + role: "user", + parts: [{ type: "text", text: "Imported context" }], + }, + ]); + } +} +``` + + + +`addMessages()` appends (or upserts) into the Session tree: + +- It does **not** run inference and does **not** enter the turn queue, so it is safe to call from inside a tool's `execute` without deadlocking. +- Array entries are appended **linearly** (each attaches under the previous one), so imported history stays a single path. By default the first message attaches to the latest committed leaf; pass `parentId` to attach elsewhere, or `null` for a root message. +- Appends are **idempotent by message id**. Pass `{ mode: "upsert" }` to update an existing message in place instead. + +The supported pattern is "add context, then run a turn": call `addMessages()`, then `runTurn()`. + Use `chat()` for low-level parent-to-child streaming when your code owns forwarding, cancellation, and replay policy. Use [Agents as tools](/agents/runtime/execution/agent-tools/) when a parent model or workflow delegates to a child agent and you want retained child runs, event replay, abort bridging, and UI drill-in. Use [`startFiber()`](/agents/runtime/execution/durable-execution/#startfiber) outside Think when the durable unit is an application job around a turn: accepting a webhook once, restoring a serialized channel or thread target, posting a visible reply, or recording app-level recovery policy. Think submissions own conversation admission and turn serialization; managed fibers own external job acceptance, idempotent side effects, and application recovery. @@ -207,6 +304,16 @@ Use [`startFiber()`](/agents/runtime/execution/durable-execution/#startfiber) ou href="/agents/harnesses/think/tools/" description="Workspace tools, code execution, browser tools, and extensions." /> + + + +```ts +export class Importer extends Think { + async startImport(input: ImportInput) { + const { runId } = await this.runAgentTool(ImportAgent, { + input, + detached: { onFinish: "onImportDone", maxBudgetMs: 60 * 60 * 1000 }, + }); + return runId; + } + + // Fires once, even if the Durable Object was evicted and rehydrated while the + // child ran. Referenced by METHOD NAME (like schedule()) — never a closure, + // which cannot survive eviction. + async onImportDone(run: AgentToolRunInfo, result: AgentToolLifecycleResult) { + switch (result.status) { + case "completed": + await this.markImportReady(run.runId, result.summary); + break; + case "error": + await this.markImportFailed(run.runId, result.error); + break; + case "interrupted": + // reason "budget-exceeded" ⇒ the run hit its maxBudgetMs ceiling. + // interrupted is soft: a child that finishes anyway re-fires this + // hook with "completed", so make the handler idempotent. + break; + } + } +} +``` + + + +Key behaviors: + +- **Durable completion.** Delivery survives eviction and deploys: a warm fast path delivers with low latency while the isolate is alive, and a self-scheduling reconcile backbone finalizes anything the fast path missed. Delivery is exactly-once on the happy path; under a crash it is at-least-once, so `onFinish` handlers must be idempotent. +- **Give-up vs. finish are independent.** A budget give-up is delivered as `status: "interrupted"`, `reason: "budget-exceeded"`. Because `interrupted` is soft, a child that completes after the give-up still re-fires `onFinish` with the real result — a premature give-up never hides a late completion. +- **Bounded.** Every detached run has an absolute `maxBudgetMs` ceiling (per-run, or the `detachedMaxBudgetMs` static option; default 24h). On expiry the parent gives up watching and tears the child down so an abandoned run cannot hold a `maxConcurrentAgentTools` slot forever. +- **No inherited signal.** A detached run must outlive the spawning turn, so it does **not** inherit `options.signal`. Cancel it explicitly: + + + +```ts +await this.cancelAgentTool(runId); // idempotent; delivers onFinish "aborted" +``` + + + +### Notify the chat on completion (Think / AIChatAgent) + +On a chat agent (`@cloudflare/think` or `AIChatAgent`) you usually want the model to _react_ to a finished background run. Instead of wiring `onFinish` by hand, pass `notify: true` — when the run finishes the agent injects a message into the chat (idempotent per run + status, so an exactly-once finish never duplicates) and the model takes its next turn with the result in context: + + + +```ts +await this.runAgentTool(ResearchAgent, { input, detached: { notify: true } }); +``` + + + +If your app routes or hides synthetic messages by `metadata.source`, pass your own source: + + + +```ts +await this.runAgentTool(ResearchAgent, { + input, + detached: { notify: { source: "research-background" } }, +}); +``` + + + +Override `formatDetachedCompletion(run, result)` to customize the injected text, or return an empty string to suppress the notification for a given outcome. An explicit `onFinish` takes precedence over `notify`. + +### The `inspectAgentToolRun` contract + +A child's `inspectAgentToolRun(runId)` returns the run's current status snapshot, or `null`. **`null` does not mean "failed"** — it means the child has no record of that run _yet_. This is normal immediately after dispatch (the child may still be persisting its first row) and is also what a freshly-rehydrated child returns before it has lazily reconciled a stale `running` row. Callers — and the framework's own reconcile backbone — treat `null` as "not terminal, keep watching within budget", never as a terminal failure. Only a non-`null` inspection with a terminal `status` (`completed` / `error` / `aborted`) finalizes a run. + +## Report progress and milestones + +A sub-agent running as an agent tool — awaited or detached — can report mid-run progress so a parent can render a live status line, meter the run server-side, or react to a named checkpoint before the run finishes. Call `reportProgress()` from inside the child (for example, from a tool's `execute`): + + + +```ts +export class ImportAgent extends Think { + getTools() { + return { + ingest: tool({ + inputSchema: z.object({ url: z.string() }), + execute: async ({ url }) => { + // Ephemeral progress: drives a generic bar / phase / status line. + await this.reportProgress({ + fraction: 0.6, + phase: "ingesting", + message: "Ingested 40k/80k rows", + }); + // ... + }, + }), + }; + } +} +``` + + + +`reportProgress()` is available on chat agents (`@cloudflare/think` and `AIChatAgent`). It is a no-op with a development warning on the base `Agent` class and when called outside an active agent-tool run, so the same child code is safe to run standalone. The framework resolves the active run from the current turn — you never thread a run ID. + +```ts +reportProgress( + progress: { + fraction?: number; // 0..1 — drives a progress bar + message?: string; // human-readable status line + phase?: string; // coarse phase label, e.g. "ingesting" + milestone?: string; // present ⇒ a durable milestone (see below) + data?: T; // app-specific payload; live-only unless persisted + }, + options?: { persist?: boolean }, +): Promise; +``` + +Ephemeral signals ride the child's own turn stream as a transient `data-agent-progress` part, so they re-broadcast to the parent's connected clients and surface on `AgentToolRunState.progress` through `useAgentToolEvents()` — a background-runs tray can render a live bar, phase, and status line without drilling in. Bursts are coalesced (latest-wins; a `fraction >= 1` frame always flushes). The `data` field is live-only unless you pass `{ persist: true }`. + +### Observe progress on the parent + +Override `onProgress()` to meter, steer, or surface progress server-side. It fires best-effort whenever a child progress signal is forwarded through the parent, for both awaited and detached runs: + + + +```ts +export class Assistant extends Think { + override async onProgress( + run: AgentToolRunInfo, + progress: AgentToolProgressSnapshot, + ) { + if (progress.milestone) { + // A durable milestone landed — branch on it. + } + console.log(run.runId, progress.phase, progress.fraction); + } +} +``` + + + +`onProgress()` is not durable: after eviction a detached run's latest snapshot is reconstructed from `inspectAgentToolRun().progress` on reconcile rather than re-firing the hook. The latest snapshot is also persisted on the child run row, so a rehydrated parent can answer "where is this run" without having tailed the live stream. + +### Durable milestones + +Naming a `milestone` promotes a signal from the ephemeral tier to a **durable** one — there is still only one emit method: + + + +```ts +await this.reportProgress({ + milestone: "sources-gathered", + data: { sources: 2 }, +}); +``` + + + +A milestone is persisted as one row on the child with a monotonic per-run `sequence`, and rides the stream as a **persisted** `data-agent-milestone` part (unlike transient progress). It therefore survives eviction, replays on drill-in, and is surfaced — deduped by `sequence` — on `AgentToolRunState.milestones` and `inspectAgentToolRun().milestones`. `onProgress()` fires for milestones too, with `progress.milestone` set, so a consumer can branch on milestone versus ephemeral progress. + +### Notify the chat on a milestone (Think / AIChatAgent) + +For a detached run on a chat agent, `detached: { onMilestones }` surfaces a chat message when a configured milestone lands, _before_ the run finishes. Each `(runId, name)` fires at most once — whether observed live or reconciled after eviction — so the deterministic ID collapses warm and cold delivery to at-most-once: + + + +```ts +// "narrate" (default): inject a synthetic assistant status line — no model turn. +await this.runAgentTool(Researcher, { + input, + detached: { onMilestones: ["sources-gathered"] }, +}); + +// "react": post a user-role turn so the model responds (steer, start dependent +// work). Costs a model turn. +await this.runAgentTool(Researcher, { + input, + detached: { onMilestones: { names: ["needs-approval"], mode: "react" } }, +}); +``` + + + +Override `formatDetachedMilestone(run, milestone)` to customize the wording, or return an empty string to suppress a given milestone. Synthetic narrate messages carry `metadata.source`, so clients can render them as an agent event rather than a human turn. + +### Resetting no-progress budget for detached runs + +Once a detached child has reported at least one signal, the reconcile backbone gives up if the run then goes silent for `detachedNoProgressBudgetMs` (default 1 hour; per-run override via `detached: { noProgressBudgetMs }`). This surfaces as `status: "interrupted"`, `reason: "no-progress"`. A child that never reports is bounded only by the absolute `detachedMaxBudgetMs` ceiling — a run is never given up on merely for being slow. Set `noProgressBudgetMs` to `0` or `Infinity` to disable the resetting window for a run. + ## Render child timelines in React -`useAgentToolEvents()` is a headless hook. It subscribes to the existing parent connection, deduplicates replay/live races, applies child `UIMessageChunk` bodies to message parts, and groups sibling runs by parent tool call ID. +`useAgentToolEvents()` is a headless hook. It subscribes to the existing parent connection, deduplicates replay/live races, applies child `UIMessageChunk` bodies to message parts, and groups sibling runs by parent tool call ID. Each run state carries `progress` and `milestones`, so a background-runs tray can render a live bar, phase, and milestone chips without drilling in. diff --git a/src/content/docs/agents/runtime/execution/run-workflows.mdx b/src/content/docs/agents/runtime/execution/run-workflows.mdx index 93fc3e33c18..2af199440fc 100644 --- a/src/content/docs/agents/runtime/execution/run-workflows.mdx +++ b/src/content/docs/agents/runtime/execution/run-workflows.mdx @@ -150,12 +150,12 @@ Base class for Workflows that integrate with Agents. ### Properties -| Property | Type | Description | -| -------------- | ------ | ------------------------------------ | -| `agent` | Stub | Typed stub for calling Agent methods | -| `instanceId` | string | The workflow instance ID | -| `workflowName` | string | The workflow binding name | -| `env` | Env | Environment bindings | +| Property | Type | Description | +| -------------- | ------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `agent` | Stub | Typed stub for calling Agent methods. For workflows started from a sub-agent, this is an RPC-only stub back to the originating facet; use sub-agent routing for HTTP or WebSocket `fetch()` traffic | +| `instanceId` | string | The workflow instance ID | +| `workflowName` | string | The workflow binding name | +| `env` | Env | Environment bindings | ### Instance methods (non-durable) @@ -238,13 +238,13 @@ Start a workflow instance and track it in the Agent database. **Parameters:** -| Parameter | Type | Description | -| ---------------------- | ------ | ----------------------------------------------------- | -| `workflowName` | string | Workflow binding name from `env` | -| `params` | object | Parameters to pass to the workflow | -| `options.id` | string | Custom workflow ID (auto-generated if not provided) | -| `options.metadata` | object | Metadata stored for querying (not passed to workflow) | -| `options.agentBinding` | string | Agent binding name (auto-detected if not provided) | +| Parameter | Type | Description | +| ---------------------- | ------ | --------------------------------------------------------------------------------------------------------------------- | +| `workflowName` | string | Workflow binding name from `env` | +| `params` | object | Parameters to pass to the workflow | +| `options.id` | string | Custom workflow ID (auto-generated if not provided) | +| `options.metadata` | object | Metadata stored for querying (not passed to workflow) | +| `options.agentBinding` | string | Agent binding name (auto-detected if not provided). When called from a sub-agent, this is the root Agent binding name | **Returns:** `Promise` - Workflow instance ID @@ -262,6 +262,54 @@ const instanceId = await this.runWorkflow( +#### Starting workflows from sub-agents + +Sub-agents can call `this.runWorkflow()` directly. The workflow is tracked in the originating sub-agent's SQLite database, and `this.agent` inside `AgentWorkflow` routes back to that same sub-agent for RPC calls, callbacks, state updates, and broadcasts. + +Parent agents do not automatically list or control workflows that a sub-agent starts. `SubAgentStub` only exposes user-defined methods, not inherited `Agent` methods such as `approveWorkflow()` or `getWorkflow()`. To control a child-started workflow from the parent, define small wrapper methods on the child and call those wrappers through the sub-agent stub. + + + +```ts +export class ParentAgent extends Agent { + async startChildWorkflow(childName: string, task: string) { + const child = await this.subAgent(ChildAgent, childName); + return child.startWorkflow(task); + } + + async approveChildWorkflow(childName: string, workflowId: string) { + const child = await this.subAgent(ChildAgent, childName); + return child.approveChildWorkflow(workflowId); + } +} + +export class ChildAgent extends Agent { + async startWorkflow(task: string) { + return this.runWorkflow("CHILD_WORKFLOW", { task }); + } + + async approveChildWorkflow(workflowId: string) { + return this.approveWorkflow(workflowId); + } + + async getChildWorkflow(workflowId: string) { + return this.getWorkflow(workflowId); + } +} +``` + + + +For sub-agent origins, `AgentWorkflow.agent` is an RPC-only stub. Use it to call Agent methods, but use `routeSubAgentRequest()` or the `/agents/{parent}/{name}/sub/{child}/{name}` URL shape for external HTTP or WebSocket routing instead of `this.agent.fetch()`. + +#### Routing constraints + +Because the originating identity is persisted durably in the workflow params and replayed on every callback, a few constraints apply to all workflows (sub-agent and top-level alike): + +- **Callbacks resolve the Agent by name.** The runtime re-resolves the originating Agent with `getAgentByName(...)`. If you addressed the Agent by a raw Durable Object ID (`idFromString` / `get(id)`) instead of by name, callbacks land on a different instance. Start workflows from name-addressed Agents. +- **Class names must survive bundling.** The originating path is keyed by `constructor.name`. Configure your bundler to preserve class names (esbuild `keepNames: true`) so progress, completion, and `this.agent` RPC can be routed back to the right facet. +- **`agentBinding` is the root binding.** When you pass `options.agentBinding` from a sub-agent, use the **root** Agent's Durable Object binding name, not a child binding. + ### sendWorkflowEvent(workflowName, instanceId, event) Send an event to a running workflow. @@ -542,7 +590,13 @@ class MyAgent extends Agent { ## Workflow tracking -Workflows started with `runWorkflow()` are automatically tracked in the Agent's internal database. You can query, filter, and manage workflows using the methods described above (`getWorkflow()`, `getWorkflows()`, `deleteWorkflow()`, etc.). +Workflows started with `runWorkflow()` are automatically tracked in the originating Agent's internal database. You can query, filter, and manage workflows using the methods described above (`getWorkflow()`, `getWorkflows()`, `deleteWorkflow()`, etc.). + +:::note[Sub-agent scoping] + +`getWorkflows()` and `getWorkflowById()` only see workflows tracked in **this** Agent's storage. If a sub-agent starts the workflow, the row lives in that sub-agent's own `cf_agents_workflows` table, not in the parent's. To build a combined view, expose a wrapper method on each child (for example, `listMyWorkflows()`) and aggregate the results across your sub-agents yourself. + +::: ### Status values diff --git a/src/content/docs/agents/runtime/execution/sub-agents.mdx b/src/content/docs/agents/runtime/execution/sub-agents.mdx index 9dd53b2f787..3a3fa9bb20f 100644 --- a/src/content/docs/agents/runtime/execution/sub-agents.mdx +++ b/src/content/docs/agents/runtime/execution/sub-agents.mdx @@ -506,6 +506,14 @@ The older synchronous `getSchedule()` and `getSchedules()` APIs throw inside sub Calling `this.destroy()` inside a sub-agent delegates cleanup to the parent. The parent cancels that sub-agent's schedules, removes recovery metadata for the sub-agent and its descendants, removes the registry entry, and asks the runtime to wipe the child storage. Treat `this.destroy()` as fire-and-forget because deleting the sub-agent can abort its isolate before the method returns cleanly. +### Workflows from sub-agents + +Sub-agents can also start [Workflows](/agents/runtime/execution/run-workflows/) with `this.runWorkflow()`. Workflow tracking is local to the sub-agent's SQLite database, and `AgentWorkflow.agent` routes RPC, callbacks, state updates, and broadcasts back to the originating sub-agent. Parent agents do not automatically list or control child-started workflows. + +Because `SubAgentStub` only exposes user-defined child methods, add child wrapper methods for controls such as `getWorkflow()`, `approveWorkflow()`, or `terminateWorkflow()`, then call those wrappers through `await this.subAgent(Child, name)`. If you pass `runWorkflow(..., { agentBinding })` from a sub-agent, use the root Agent binding name, not a child binding name. + +For sub-agent workflow origins, `AgentWorkflow.agent` is RPC-only. Use it to call Agent methods, but use `routeSubAgentRequest()` or the nested `/agents/{parent}/{name}/sub/{child}/{name}` URL shape for external HTTP or WebSocket routing instead of `this.agent.fetch()`. + ## Example +## When context is lost + +The agent context only propagates along the call tree of the original invocation. Code reached outside that call tree starts with an empty context, so `getCurrentAgent()` returns an object whose fields are `undefined`. Common cases include: + +- a host callback invoked through RPC from a Worker Loader child isolate, such as sandboxed Codemode execution; +- a service binding or Durable Object RPC entrypoint; +- a queue consumer or another entrypoint that retains an agent reference. + +Route the callback through a public method on the agent. Custom methods are wrapped automatically, so calling `agent.someMethod()` re-enters that agent's context: + + + +```ts +import { RpcTarget } from "cloudflare:workers"; + +class HostCallbackBridge extends RpcTarget { + agent: MyMcpAgent; + + constructor(agent: MyMcpAgent) { + super(); + this.agent = agent; + } + + // Invoked through RPC from a Worker Loader child isolate. There is no context + // ancestry. Calling a public agent method restores it automatically. + async invoke() { + return this.agent.handleSandboxCallback(); + } +} + +export class MyMcpAgent extends McpAgent { + async handleSandboxCallback() { + const { agent } = getCurrentAgent(); + // `agent` is available again. + } +} +``` + + + +Context restored this way has `connection`, `request`, and `email` unset. It is not tied to live client I/O. + +Server-initiated MCP requests (`elicitInput`, `createMessage`, and `listRoots`) on `McpAgent` do not require this indirection because the MCP transport retains its owning agent. + ## API reference ### `getCurrentAgent()`