Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions js/ai/src/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ function interruptTool(registry?: Registry) {
if (registry) {
assertUnstable(registry, 'beta', 'Tool interrupts are a beta feature.');
}
if (metadata) {
setCustomMetadataAttributes({
interrupt: JSON.stringify(metadata),
});
}
throw new ToolInterruptError(metadata);
};
}
Expand All @@ -490,6 +495,15 @@ export function tool<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
return config.multipart ? multipartTool(config, fn) : basicTool(config, fn);
}

function recordResumedMetadata(runOptions: any) {
const optionsMetadata = runOptions.metadata;
if (optionsMetadata?.resumed) {
setCustomMetadataAttributes({
resumed: JSON.stringify(optionsMetadata.resumed),
});
}
}

function basicTool<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
config: ToolConfig<I, O>,
fn?: ToolFn<I, O>
Expand All @@ -501,6 +515,7 @@ function basicTool<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
metadata: { ...(config.metadata || {}), type: 'tool', dynamic: true },
},
(i, runOptions) => {
recordResumedMetadata(runOptions);
const interrupt = interruptTool(runOptions.registry);
if (fn) {
return fn(i, {
Expand Down Expand Up @@ -548,6 +563,7 @@ function multipartTool<I extends z.ZodTypeAny, O extends z.ZodTypeAny>(
},
},
(i, runOptions) => {
recordResumedMetadata(runOptions);
const interrupt = interruptTool(runOptions.registry);
if (fn) {
return fn(i, {
Expand Down
136 changes: 136 additions & 0 deletions js/testapps/flow-simple-ai/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1285,3 +1285,139 @@ ai.defineFlow('abort-signal', async (_, { sendChunk }) => {
abortSignal: signal,
});
});

const transferMoney = ai.defineTool(
{
name: 'transferMoney',
description: 'Transfers money between accounts.',
inputSchema: z.object({
toAccountId: z
.string()
.describe('the account id of the transfer destination'),
amount: z.number().describe('the amount in integer cents (100 = $1.00)'),
}),
outputSchema: z.object({
status: z.string().describe('the outcome of the transfer'),
message: z.string().optional(),
}),
},
async (input, opts) => {
const { interrupt, resumed, context } = opts;
const resumedStatus = (resumed as Record<string, any>)?.status;
// if the user rejected the transaction
if (resumedStatus === 'REJECTED') {
return {
status: 'REJECTED',
message: 'The user rejected the transaction.',
};
}
// trigger an interrupt to confirm if amount > $100
if (resumedStatus !== 'APPROVED' && input.amount > 10000) {
interrupt({
message: 'Please confirm sending an amount > $100.',
});
}
// complete the transaction if not interrupted
return {
status: 'COMPLETED',
message: `Transferred $${input.amount / 100} to ${input.toAccountId}`,
};
}
);

export const transferFlow = ai.defineFlow(
{
name: 'transferFlowWithRestart',
inputSchema: z.string(),
outputSchema: z.string(),
},
async (prompt) => {
let response = await ai.generate({
model: googleAI.model('gemini-flash-latest'),
tools: [transferMoney],
prompt: prompt,
});

while (response.interrupts.length) {
console.log('Interrupted, resuming with approval in 5 seconds...');
await new Promise((resolve) => setTimeout(resolve, 5000));
const confirmations = [];
// multiple interrupts can be called at once, so we handle them all
for (const interrupt of response.interrupts) {
confirmations.push(
// use the 'restart' method on our tool to provide `resumed` metadata
transferMoney.restart(
interrupt,
// send the tool request input to the user to respond. assume that this
// returns `{status: "APPROVED"}` or `{status: "REJECTED"}`
{ status: 'APPROVED' }
)
);
}

console.log('Resuming');
response = await ai.generate({
model: googleAI.model('gemini-flash-latest'),
tools: [transferMoney],
messages: response.messages,
resume: {
restart: confirmations,
},
});
}
// no more interrupts, we can see the final response
return response.text;
}
);

const askQuestion = ai.defineInterrupt({
name: 'askQuestion',
description: 'use this to ask the user a clarifying question',
inputSchema: z.object({
choices: z.array(z.string()).describe('the choices to display to the user'),
allowOther: z.boolean().optional().describe('when true, allow write-ins'),
}),
outputSchema: z.string(),
});

export const transferFlowManual = ai.defineFlow(
{
name: 'transferFlowManual',
outputSchema: z.string(),
},
async () => {
const response = await ai.generate({
prompt: 'Ask me a movie trivia question.',
tools: [askQuestion],
model: googleAI.model('gemini-2.5-pro'),
});

const answers = [];
if (response.interrupts.length > 0) {
console.log('Interrupted, resuming with approval in 5 seconds...');
await new Promise((resolve) => setTimeout(resolve, 5000));
// multiple interrupts can be called at once, so we handle them all
for (const question of response.interrupts) {
answers.push(
// use the `respond` method on our tool to populate answers
askQuestion.respond(
question,
// mock response
'The answer is C'
)
);
}
}

const finalResponse = await ai.generate({
tools: [askQuestion],
model: googleAI.model('gemini-2.5-pro'),
messages: response.messages,
resume: {
respond: answers,
},
});

return finalResponse.text;
}
Comment thread
ssbushi marked this conversation as resolved.
);
Loading