Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/polite-shirts-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@cloudflare/ai-chat": patch
---

Add queues to AIChatAgent to support simultaneous input messages
304 changes: 154 additions & 150 deletions packages/ai-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,14 @@ export class AIChatAgent<
private _approvalPersistedMessageId: string | null = null;

/**
* Promise that resolves when the current stream completes.
* Used to wait for message persistence before continuing after tool results.
* Queue for serializing chat requests so that only one
* onChatMessage → _reply cycle is in-flight at a time.
* Prevents races when multiple messages arrive before the first
* response completes.
* @internal
*/
private _streamCompletionPromise: Promise<void> | null = null;
private _streamCompletionResolve: (() => void) | null = null;
private _chatRequestQueue: Array<() => Promise<void>> = [];
private _chatRequestInFlight = false;

/**
* Set of connection IDs that are pending stream resume.
Expand Down Expand Up @@ -425,50 +427,62 @@ export class AIChatAgent<
const chatMessageId = data.id;
const abortSignal = this._getAbortSignal(chatMessageId);

return this._tryCatchChat(async () => {
// Wrap in agentContext.run() to propagate connection context to onChatMessage
// This ensures getCurrentAgent() returns the connection inside tool execute functions
return agentContext.run(
{ agent: this, connection, request: undefined, email: undefined },
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {
// User-provided hook. Cleanup is now handled by _reply,
// so this is optional for the user to pass to streamText.
},
{
abortSignal,
clientTools,
body: this._lastBody
}
);

if (response) {
await this._reply(data.id, response, [connection.id], {
chatMessageId
});
} else {
console.warn(
`[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}`
);
this._broadcastChatMessage(
{
body: "No response was generated by the agent.",
done: true,
id: data.id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
// Enqueue the LLM call so concurrent messages are
// processed one at a time. The handler returns
// immediately; the queue drains via ctx.waitUntil.
this._enqueueChatRequest(async () => {
await this._tryCatchChat(async () => {
// Wrap in agentContext.run() to propagate connection context to onChatMessage
// This ensures getCurrentAgent() returns the connection inside tool execute functions
return agentContext.run(
{
agent: this,
connection,
request: undefined,
email: undefined
},
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {
// User-provided hook. Cleanup is now handled by _reply,
// so this is optional for the user to pass to streamText.
},
[connection.id]
{
abortSignal,
clientTools,
body: this._lastBody
}
);

if (response) {
await this._reply(data.id, response, [connection.id], {
chatMessageId
});
} else {
console.warn(
`[AIChatAgent] onChatMessage returned no response for chatMessageId: ${chatMessageId}`
);
this._broadcastChatMessage(
{
body: "No response was generated by the agent.",
done: true,
id: data.id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE
},
[connection.id]
);
}
}
}
);
);
});
});
return;
}

// Handle clear chat
if (data.type === MessageType.CF_AGENT_CHAT_CLEAR) {
this._destroyAbortControllers();
this._chatRequestQueue.length = 0;
this.sql`delete from cf_ai_chat_agent_messages`;
this._resumableStream.clearAll();
this._pendingResumeConnections.clear();
Expand Down Expand Up @@ -542,97 +556,14 @@ export class AIChatAgent<
// This mimics server-executed tool behavior where the LLM
// automatically continues after seeing tool results
if (applied && autoContinue) {
// Wait for the original stream to complete and message to be persisted
// before calling onChatMessage, so this.messages includes the tool result
const waitForStream = async () => {
if (this._streamCompletionPromise) {
await this._streamCompletionPromise;
} else {
// TODO: The completion promise can be null if the stream finished
// before the tool result arrived (race between stream end and tool
// apply). The 500ms fallback is a pragmatic workaround — consider
// a more deterministic signal (e.g. always setting the promise).
await new Promise((resolve) => setTimeout(resolve, 500));
}
};

waitForStream()
.then(() => {
const continuationId = nanoid();
const abortSignal = this._getAbortSignal(continuationId);

return this._tryCatchChat(async () => {
return agentContext.run(
{
agent: this,
connection,
request: undefined,
email: undefined
},
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {
// User-provided hook. Cleanup handled by _reply.
},
{
abortSignal,
clientTools: clientTools ?? this._lastClientTools,
body: this._lastBody
}
);

if (response) {
// Pass continuation flag to merge parts into last assistant message
// Note: We pass an empty excludeBroadcastIds array because the sender
// NEEDS to receive the continuation stream. Unlike regular chat requests
// where aiFetch handles the response, tool continuations have no listener
// waiting - the client relies on the broadcast.
await this._reply(
continuationId,
response,
[], // Don't exclude sender - they need the continuation
{
continuation: true,
chatMessageId: continuationId
}
);
}
}
);
});
})
.catch((error) => {
console.error(
"[AIChatAgent] Tool continuation failed:",
error
);
});
}
}
);
return;
}

// Handle client-side tool approval response
if (data.type === MessageType.CF_AGENT_TOOL_APPROVAL) {
const { toolCallId, approved, autoContinue } = data;
this._applyToolApproval(toolCallId, approved).then((applied) => {
// Only auto-continue if approved AND client requested it
if (applied && approved && autoContinue) {
const waitForStream = async () => {
if (this._streamCompletionPromise) {
await this._streamCompletionPromise;
} else {
await new Promise((resolve) => setTimeout(resolve, 500));
}
};

waitForStream()
.then(() => {
// Enqueue the continuation — the queue guarantees the
// current stream finishes (and messages are persisted)
// before this callback runs.
this._enqueueChatRequest(async () => {
const continuationId = nanoid();
const abortSignal = this._getAbortSignal(continuationId);

return this._tryCatchChat(async () => {
await this._tryCatchChat(async () => {
return agentContext.run(
{
agent: this,
Expand All @@ -642,30 +573,80 @@ export class AIChatAgent<
},
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {},
async (_finishResult) => {
// User-provided hook. Cleanup handled by _reply.
},
{
abortSignal,
clientTools: this._lastClientTools,
clientTools: clientTools ?? this._lastClientTools,
body: this._lastBody
}
);

if (response) {
await this._reply(continuationId, response, [], {
continuation: true,
chatMessageId: continuationId
});
// Pass continuation flag to merge parts into last assistant message
// Note: We pass an empty excludeBroadcastIds array because the sender
// NEEDS to receive the continuation stream. Unlike regular chat requests
// where aiFetch handles the response, tool continuations have no listener
// waiting - the client relies on the broadcast.
await this._reply(
continuationId,
response,
[], // Don't exclude sender - they need the continuation
{
continuation: true,
chatMessageId: continuationId
}
);
}
}
);
});
})
.catch((error) => {
console.error(
"[AIChatAgent] Tool approval continuation failed:",
error
});
}
}
);
return;
}

// Handle client-side tool approval response
if (data.type === MessageType.CF_AGENT_TOOL_APPROVAL) {
const { toolCallId, approved, autoContinue } = data;
this._applyToolApproval(toolCallId, approved).then((applied) => {
// Only auto-continue if approved AND client requested it
if (applied && approved && autoContinue) {
this._enqueueChatRequest(async () => {
const continuationId = nanoid();
const abortSignal = this._getAbortSignal(continuationId);

await this._tryCatchChat(async () => {
return agentContext.run(
{
agent: this,
connection,
request: undefined,
email: undefined
},
async () => {
const response = await this.onChatMessage(
async (_finishResult) => {},
{
abortSignal,
clientTools: this._lastClientTools,
body: this._lastBody
}
);

if (response) {
await this._reply(continuationId, response, [], {
continuation: true,
chatMessageId: continuationId
});
}
}
);
});
});
}
});
return;
Expand Down Expand Up @@ -745,6 +726,39 @@ export class AIChatAgent<
this._resumableStream.markError(streamId);
}

/**
* Enqueue a chat request for serialized execution.
* If nothing is in-flight the request runs immediately;
* otherwise it waits until every earlier request's
* onChatMessage → _reply cycle has completed.
* @internal
*/
private _enqueueChatRequest(execute: () => Promise<void>) {
this._chatRequestQueue.push(execute);
if (!this._chatRequestInFlight) {
this._drainChatRequestQueue();
}
}

/**
* Process queued chat requests one at a time.
* Errors in individual requests are logged but do not
* prevent subsequent requests from executing.
* @internal
*/
private async _drainChatRequestQueue() {
this._chatRequestInFlight = true;
while (this._chatRequestQueue.length > 0) {
const next = this._chatRequestQueue.shift()!;
try {
await next();
} catch (error) {
console.error("[AIChatAgent] Queued chat request failed:", error);
}
}
this._chatRequestInFlight = false;
}

/**
* Restore _lastBody and _lastClientTools from SQLite.
* Called in the constructor so these values survive DO hibernation.
Expand Down Expand Up @@ -1859,10 +1873,6 @@ export class AIChatAgent<
};
// Track the streaming message so tool results can be applied before persistence
this._streamingMessage = message;
// Set up completion promise for tool continuation to wait on
this._streamCompletionPromise = new Promise((resolve) => {
this._streamCompletionResolve = resolve;
});

// Determine response format based on content-type
const contentType = response.headers.get("content-type") || "";
Expand Down Expand Up @@ -1912,19 +1922,12 @@ export class AIChatAgent<
} finally {
reader.releaseLock();

// Always clear the streaming message reference and resolve completion
// promise, even on error. Without this, tool continuations waiting on
// _streamCompletionPromise would hang forever after a stream error.
// Always clear the streaming message reference, even on error.
this._streamingMessage = null;
// Capture and clear early-persist tracking. The persistence block
// after the finally uses the local to update in place.
earlyPersistedId = this._approvalPersistedMessageId;
this._approvalPersistedMessageId = null;
if (this._streamCompletionResolve) {
this._streamCompletionResolve();
this._streamCompletionResolve = null;
this._streamCompletionPromise = null;
}

// Framework-level cleanup: always remove abort controller.
// Only emit observability on success (not on error path).
Expand Down Expand Up @@ -2045,6 +2048,7 @@ export class AIChatAgent<
*/
async destroy() {
this._destroyAbortControllers();
this._chatRequestQueue.length = 0;
this._resumableStream.destroy();
await super.destroy();
}
Expand Down
Loading
Loading