Skip to content

Commit 516c21e

Browse files
committed
feat(chat): add chat.inject() for background context injection and chat.defer improvements
- chat.inject(): queue model messages from background work for injection at the next prepareStep boundary or before the next turn's run() - Deferred work from onTurnComplete no longer blocks waiting for next message - Background queue persists across turns (not reset) so deferred work from onTurnComplete can inject into the next turn - Reference app: self-review pattern using generateObject + chat.inject() - Hide transient data-turn-status and data-background-context-injected parts in UI
1 parent 518dc5d commit 516c21e

File tree

3 files changed

+129
-4
lines changed

3 files changed

+129
-4
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,13 @@ const stopInput = streams.input<{ stop: true; message?: string }>({ id: CHAT_STO
500500
*/
501501
const chatDeferKey = locals.create<Set<Promise<unknown>>>("chat.defer");
502502

503+
/**
504+
* Per-turn background context queue. Messages added via `chat.backgroundWork.inject()`
505+
* are drained at the next `prepareStep` boundary and appended to the model messages.
506+
* @internal
507+
*/
508+
const chatBackgroundQueueKey = locals.create<ModelMessage[]>("chat.backgroundQueue");
509+
503510
/**
504511
* Run-scoped pipe counter. Stored in locals so concurrent runs in the
505512
* same worker don't share state.
@@ -1458,11 +1465,11 @@ function toStreamTextOptions(options?: ToStreamTextOptionsOptions): Record<strin
14581465
const telemetry = prompt.toAISDKTelemetry(options?.telemetry);
14591466
Object.assign(result, telemetry);
14601467

1461-
// Auto-inject prepareStep when compaction or pendingMessages is configured.
1468+
// Auto-inject prepareStep for compaction, pending messages, and background context injection.
14621469
const taskCompaction = locals.get(chatTaskCompactionKey);
14631470
const taskPendingMessages = locals.get(chatPendingMessagesKey);
14641471

1465-
if (taskCompaction || taskPendingMessages) {
1472+
{
14661473
result.prepareStep = async ({ messages, steps }: { messages: ModelMessage[]; steps: CompactionStep[] }) => {
14671474
let resultMessages: ModelMessage[] | undefined;
14681475

@@ -1501,6 +1508,13 @@ function toStreamTextOptions(options?: ToStreamTextOptionsOptions): Record<strin
15011508
}
15021509
}
15031510

1511+
// 3. Background context injection
1512+
const bgQueue = locals.get(chatBackgroundQueueKey);
1513+
if (bgQueue && bgQueue.length > 0) {
1514+
const injected = bgQueue.splice(0); // drain
1515+
resultMessages = [...(resultMessages ?? messages), ...injected];
1516+
}
1517+
15041518
return resultMessages ? { messages: resultMessages } : undefined;
15051519
};
15061520
}
@@ -2324,6 +2338,9 @@ function chatTask<
23242338
locals.set(chatDeferKey, new Set());
23252339
locals.set(chatCompactionStateKey, undefined);
23262340
locals.set(chatSteeringQueueKey, []);
2341+
// NOTE: chatBackgroundQueueKey is NOT reset here — messages injected
2342+
// by deferred work from the previous turn's onTurnComplete need to
2343+
// survive into the next turn. The queue is drained before run().
23272344
locals.set(chatInjectedMessageIdsKey, new Set());
23282345

23292346
// Store chat context for auto-detection by ai.tool subtasks
@@ -2537,6 +2554,12 @@ function chatTask<
25372554
let runResult: unknown;
25382555

25392556
try {
2557+
// Drain any messages injected by background work (e.g. self-review from previous turn)
2558+
const bgQueue = locals.get(chatBackgroundQueueKey);
2559+
if (bgQueue && bgQueue.length > 0) {
2560+
accumulatedMessages.push(...bgQueue.splice(0));
2561+
}
2562+
25402563
runResult = await userRun({
25412564
...restWire,
25422565
messages: await applyPrepareMessages(accumulatedMessages, "run"),
@@ -2926,6 +2949,12 @@ function chatTask<
29262949
);
29272950
}
29282951

2952+
// NOTE: We intentionally do NOT await deferred work from onTurnComplete here.
2953+
// Promises deferred in onTurnComplete (e.g. background self-review via
2954+
// chat.defer + chat.inject) run during the idle wait. If they complete
2955+
// before the next message, their injected context is picked up in prepareStep.
2956+
// The pre-onBeforeTurnComplete drain handles promises from onTurnStart/run().
2957+
29292958
// If messages arrived during streaming (without pendingMessages config),
29302959
// use the first one immediately as the next turn.
29312960
if (pendingMessages.length > 0) {
@@ -3154,6 +3183,43 @@ function chatDefer(promise: Promise<unknown>): void {
31543183
}
31553184
}
31563185

3186+
// ---------------------------------------------------------------------------
3187+
// Background context injection
3188+
// ---------------------------------------------------------------------------
3189+
3190+
/**
3191+
* Queue model messages for injection at the next `prepareStep` boundary.
3192+
*
3193+
* Use this to inject context from background work into the agent's conversation.
3194+
* Messages are appended to the model messages before the next LLM inference call.
3195+
*
3196+
* Combine with `chat.defer()` to run background analysis and inject results:
3197+
*
3198+
* @example
3199+
* ```ts
3200+
* onTurnComplete: async ({ messages }) => {
3201+
* chat.defer((async () => {
3202+
* const review = await generateObject({
3203+
* model: openai("gpt-4o-mini"),
3204+
* messages: [...messages, { role: "user", content: "Review the last response." }],
3205+
* schema: z.object({ suggestions: z.array(z.string()) }),
3206+
* });
3207+
* if (review.object.suggestions.length > 0) {
3208+
* chat.inject([{
3209+
* role: "system",
3210+
* content: `Improvements for next response:\n${review.object.suggestions.join("\n")}`,
3211+
* }]);
3212+
* }
3213+
* })());
3214+
* },
3215+
* ```
3216+
*/
3217+
function injectBackgroundContext(messages: ModelMessage[]): void {
3218+
const queue = locals.get(chatBackgroundQueueKey) ?? [];
3219+
queue.push(...messages);
3220+
locals.set(chatBackgroundQueueKey, queue);
3221+
}
3222+
31573223
// ---------------------------------------------------------------------------
31583224
// Aborted message cleanup
31593225
// ---------------------------------------------------------------------------
@@ -4154,6 +4220,8 @@ export const chat = {
41544220
cleanupAbortedParts,
41554221
/** Register background work that runs in parallel with streaming. See {@link chatDefer}. */
41564222
defer: chatDefer,
4223+
/** Queue model messages for injection at the next `prepareStep` boundary. See {@link injectBackgroundContext}. */
4224+
inject: injectBackgroundContext,
41574225
/** Typed chat output stream for writing custom chunks or piping from subtasks. */
41584226
stream: chatStream,
41594227
/** Pre-built input stream for receiving messages from the transport. */

references/ai-chat/src/components/chat.tsx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,14 @@ export function Chat({
476476
);
477477
}
478478

479+
// Transient status parts — hide from rendered output
480+
if (
481+
part.type === "data-turn-status" ||
482+
part.type === "data-background-context-injected"
483+
) {
484+
return null;
485+
}
486+
479487
if (part.type === "data-research-progress") {
480488
return <ResearchProgress key={i} part={part} />;
481489
}

references/ai-chat/src/trigger/chat.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ai, chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
22
import { logger, schemaTask, task, prompts } from "@trigger.dev/sdk";
3-
import { streamText, generateText, tool, dynamicTool, stepCountIs, generateId, createProviderRegistry } from "ai";
3+
import { streamText, generateText, generateObject, tool, dynamicTool, stepCountIs, generateId, createProviderRegistry } from "ai";
44
import type { LanguageModel, LanguageModelUsage, Tool as AITool, UIMessage } from "ai";
55
import { openai } from "@ai-sdk/openai";
66
import { anthropic } from "@ai-sdk/anthropic";
@@ -56,6 +56,20 @@ When the user asks you to research a topic, use the deep research tool with rele
5656
- Keep responses under a few paragraphs unless the user asks for more.`,
5757
});
5858

59+
const selfReviewPrompt = prompts.define({
60+
id: "ai-chat-self-review",
61+
model: "openai:gpt-4o-mini",
62+
content: `You are a conversation quality reviewer. Analyze the assistant's most recent response and provide structured feedback.
63+
64+
Focus on:
65+
- Whether the response actually answered the user's question
66+
- Missed opportunities to use tools or provide more detail
67+
- Tone mismatches (too formal, too casual, etc.)
68+
- Factual claims that should have been verified with tools
69+
70+
Be concise. Only flag issues worth fixing — don't nitpick.`,
71+
});
72+
5973
const MODELS: Record<string, () => LanguageModel> = {
6074
"gpt-4o-mini": () => openai("gpt-4o-mini"),
6175
"gpt-4o": () => openai("gpt-4o"),
@@ -437,7 +451,7 @@ export const aiChat = chat.task({
437451
// Deferred — runs in parallel with streaming, awaited before onTurnComplete.
438452
chat.defer(prisma.chat.update({ where: { id: chatId }, data: { messages: uiMessages as any } }));
439453
},
440-
onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
454+
onTurnComplete: async ({ chatId, uiMessages, messages, runId, chatAccessToken, lastEventId }) => {
441455
// Persist final messages + assistant response + stream position
442456
await prisma.chat.update({
443457
where: { id: chatId },
@@ -459,6 +473,41 @@ export const aiChat = chat.task({
459473
},
460474
});
461475
}
476+
477+
// Background self-review — a cheap model critiques the response and injects
478+
// coaching into the conversation before the next user message arrives.
479+
chat.defer((async () => {
480+
const resolved = await selfReviewPrompt.resolve({});
481+
482+
const review = await generateObject({
483+
model: registry.languageModel(resolved.model ?? "openai:gpt-4o-mini"),
484+
...resolved.toAISDKTelemetry(),
485+
system: resolved.text,
486+
prompt: `Here is the conversation to review:\n\n${messages.filter((m) => m.role === "user" || m.role === "assistant").map((m) => `${m.role}: ${typeof m.content === "string" ? m.content : Array.isArray(m.content) ? m.content.filter((p: any) => p.type === "text").map((p: any) => p.text).join("") : ""}`).join("\n\n")}`,
487+
schema: z.object({
488+
needsImprovement: z.boolean().describe("Whether the response needs improvement"),
489+
suggestions: z.array(z.string()).describe("Specific actionable suggestions for the next response"),
490+
missedTools: z.array(z.string()).describe("Tool names the assistant should have used but didn't"),
491+
}),
492+
});
493+
494+
const parts = [];
495+
if (review.object.suggestions.length > 0) {
496+
parts.push(`Suggestions:\n${review.object.suggestions.map((s) => `- ${s}`).join("\n")}`);
497+
}
498+
if (review.object.missedTools.length > 0) {
499+
parts.push(`Consider using: ${review.object.missedTools.join(", ")}`);
500+
}
501+
502+
chat.inject([
503+
{
504+
role: "system" as const,
505+
content: review.object.needsImprovement
506+
? `[Self-review of your previous response]\n\n${parts.join("\n\n")}\n\nApply these improvements naturally in your next response.`
507+
: `[Self-review of your previous response]\n\nYour previous response was good. No changes needed.`,
508+
},
509+
]);
510+
})());
462511
},
463512
run: async ({ messages, clientData, stopSignal }) => {
464513
// Track usage

0 commit comments

Comments
 (0)