Skip to content

Commit 518dc5d

Browse files
committed
feat(ai): add triggerAndSubscribe method and use it in ai.tool
Replace triggerAndWait with triggerAndSubscribe in ai.tool to fix: - Parallel tool calls (no more preventMultipleWaits errors) - Stop signal while suspended (parent stays alive, child gets cancelled) New task.triggerAndSubscribe() method: trigger + subscribeToRun in a single span, with abort signal support and cancelOnAbort option. Convert deepResearch to a schemaTask + ai.tool in the reference app. refs TRI-7986
1 parent a33f294 commit 518dc5d

File tree

7 files changed

+544
-10
lines changed

7 files changed

+544
-10
lines changed

packages/core/src/v3/types/tasks.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,30 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
618618
requestOptions?: TriggerApiRequestOptions
619619
) => TaskRunPromise<TIdentifier, TOutput>;
620620

621+
/**
622+
* Trigger a task and subscribe to its updates via realtime. Unlike `triggerAndWait`,
623+
* this does NOT suspend the parent run — the parent stays alive and polls for updates.
624+
* This enables parallel tool calls and proper abort signal handling.
625+
*
626+
* @param payload
627+
* @param options - Options for the task run, including an optional `signal` to cancel the subscription and child run
628+
* @returns TaskRunPromise
629+
* @example
630+
* ```
631+
* const result = await task.triggerAndSubscribe({ foo: "bar" }, { signal: abortSignal });
632+
*
633+
* if (result.ok) {
634+
* console.log(result.output);
635+
* } else {
636+
* console.error(result.error);
637+
* }
638+
* ```
639+
*/
640+
triggerAndSubscribe: (
641+
payload: TInput,
642+
options?: TriggerAndSubscribeOptions,
643+
) => TaskRunPromise<TIdentifier, TOutput>;
644+
621645
/**
622646
* Batch trigger multiple task runs with the given payloads, and wait for the results. Returns the results of the task runs.
623647
* @param items - Array, AsyncIterable, or ReadableStream of batch items
@@ -966,6 +990,16 @@ export type TriggerOptions = {
966990
};
967991

968992
export type TriggerAndWaitOptions = Omit<TriggerOptions, "version">;
993+
994+
export type TriggerAndSubscribeOptions = Omit<TriggerOptions, "version"> & {
995+
/** An AbortSignal to cancel the subscription. When fired, the subscription closes and the promise rejects. */
996+
signal?: AbortSignal;
997+
/**
998+
* Whether to cancel the child run when the abort signal fires.
999+
* @default true
1000+
*/
1001+
cancelOnAbort?: boolean;
1002+
};
9691003
export type BatchTriggerOptions = {
9701004
/**
9711005
* If no idempotencyKey is set on an individual item in the batch, it will use this key on each item + the array index.

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,12 @@ function toolFromTask<
147147
}
148148

149149
return await task
150-
.triggerAndWait(input as inferSchemaIn<TTaskSchema>, {
150+
.triggerAndSubscribe(input as inferSchemaIn<TTaskSchema>, {
151151
metadata: {
152152
[METADATA_KEY]: toolMeta as any,
153153
},
154154
tags: options?.toolCallId ? [`toolCallId:${options.toolCallId}`] : undefined,
155+
signal: options?.abortSignal,
155156
})
156157
.unwrap();
157158
},

packages/trigger-sdk/src/v3/runs.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,14 @@ export type SubscribeToRunOptions = {
358358
* ```
359359
*/
360360
skipColumns?: RealtimeRunSkipColumns;
361+
362+
/**
363+
* An AbortSignal to cancel the subscription.
364+
*
365+
* When the signal is aborted, the underlying SSE connection is closed
366+
* and the async iterator completes.
367+
*/
368+
signal?: AbortSignal;
361369
};
362370

363371
/**
@@ -403,6 +411,7 @@ function subscribeToRun<TRunId extends AnyRunHandle | AnyTask | string>(
403411
closeOnComplete:
404412
typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true,
405413
skipColumns: options?.skipColumns,
414+
signal: options?.signal,
406415
});
407416
}
408417

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ import type {
9090
TaskWithToolOptions,
9191
ToolTask,
9292
ToolTaskParameters,
93+
TriggerAndSubscribeOptions,
9394
TriggerAndWaitOptions,
9495
TriggerApiRequestOptions,
9596
TriggerOptions,
@@ -214,6 +215,26 @@ export function createTask<
214215
});
215216
}, params.id);
216217
},
218+
triggerAndSubscribe: (payload, options) => {
219+
return new TaskRunPromise<TIdentifier, TOutput>((resolve, reject) => {
220+
triggerAndSubscribe_internal<TIdentifier, TInput, TOutput>(
221+
"triggerAndSubscribe()",
222+
params.id,
223+
payload,
224+
undefined,
225+
{
226+
queue: params.queue?.name,
227+
...options,
228+
}
229+
)
230+
.then((result) => {
231+
resolve(result);
232+
})
233+
.catch((error) => {
234+
reject(error);
235+
});
236+
}, params.id);
237+
},
217238
batchTriggerAndWait: async (items, options) => {
218239
return await batchTriggerAndWait_internal<TIdentifier, TInput, TOutput>(
219240
"batchTriggerAndWait()",
@@ -345,6 +366,26 @@ export function createSchemaTask<
345366
});
346367
}, params.id);
347368
},
369+
triggerAndSubscribe: (payload, options) => {
370+
return new TaskRunPromise<TIdentifier, TOutput>((resolve, reject) => {
371+
triggerAndSubscribe_internal<TIdentifier, inferSchemaIn<TSchema>, TOutput>(
372+
"triggerAndSubscribe()",
373+
params.id,
374+
payload,
375+
parsePayload,
376+
{
377+
queue: params.queue?.name,
378+
...options,
379+
}
380+
)
381+
.then((result) => {
382+
resolve(result);
383+
})
384+
.catch((error) => {
385+
reject(error);
386+
});
387+
}, params.id);
388+
},
348389
batchTriggerAndWait: async (items, options) => {
349390
return await batchTriggerAndWait_internal<TIdentifier, inferSchemaIn<TSchema>, TOutput>(
350391
"batchTriggerAndWait()",
@@ -463,6 +504,49 @@ export function triggerAndWait<TTask extends AnyTask>(
463504
}, id);
464505
}
465506

507+
/**
508+
* Trigger a task and subscribe to its updates via realtime. Unlike `triggerAndWait`,
509+
* this does NOT suspend the parent run — the parent stays alive and subscribes to updates.
510+
* This enables parallel execution and proper abort signal handling.
511+
*
512+
* @param id - The id of the task to trigger
513+
* @param payload
514+
* @param options - Options for the task run, including an optional `signal` to cancel the subscription and child run
515+
* @returns TaskRunPromise
516+
* @example
517+
* ```ts
518+
* import { tasks } from "@trigger.dev/sdk/v3";
519+
* const result = await tasks.triggerAndSubscribe("my-task", { foo: "bar" });
520+
*
521+
* if (result.ok) {
522+
* console.log(result.output);
523+
* } else {
524+
* console.error(result.error);
525+
* }
526+
* ```
527+
*/
528+
export function triggerAndSubscribe<TTask extends AnyTask>(
529+
id: TaskIdentifier<TTask>,
530+
payload: TaskPayload<TTask>,
531+
options?: TriggerAndSubscribeOptions
532+
): TaskRunPromise<TaskIdentifier<TTask>, TaskOutput<TTask>> {
533+
return new TaskRunPromise<TaskIdentifier<TTask>, TaskOutput<TTask>>((resolve, reject) => {
534+
triggerAndSubscribe_internal<TaskIdentifier<TTask>, TaskPayload<TTask>, TaskOutput<TTask>>(
535+
"tasks.triggerAndSubscribe()",
536+
id,
537+
payload,
538+
undefined,
539+
options
540+
)
541+
.then((result) => {
542+
resolve(result);
543+
})
544+
.catch((error) => {
545+
reject(error);
546+
});
547+
}, id);
548+
}
549+
466550
/**
467551
* Batch trigger multiple task runs with the given payloads, and wait for the results. Returns the results of the task runs.
468552
* @param id - The id of the task to trigger
@@ -2439,6 +2523,128 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
24392523
);
24402524
}
24412525

2526+
async function triggerAndSubscribe_internal<TIdentifier extends string, TPayload, TOutput>(
2527+
name: string,
2528+
id: TIdentifier,
2529+
payload: TPayload,
2530+
parsePayload?: SchemaParseFn<TPayload>,
2531+
options?: TriggerAndSubscribeOptions
2532+
): Promise<TaskRunResult<TIdentifier, TOutput>> {
2533+
const ctx = taskContext.ctx;
2534+
2535+
if (!ctx) {
2536+
throw new Error("triggerAndSubscribe can only be used from inside a task.run()");
2537+
}
2538+
2539+
const apiClient = apiClientManager.clientOrThrow();
2540+
2541+
const parsedPayload = parsePayload ? await parsePayload(payload) : payload;
2542+
const payloadPacket = await stringifyIO(parsedPayload);
2543+
2544+
const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey);
2545+
const idempotencyKeyOptions = processedIdempotencyKey
2546+
? getIdempotencyKeyOptions(processedIdempotencyKey)
2547+
: undefined;
2548+
2549+
return await tracer.startActiveSpan(
2550+
name,
2551+
async (span) => {
2552+
const response = await apiClient.triggerTask(
2553+
id,
2554+
{
2555+
payload: payloadPacket.data,
2556+
options: {
2557+
lockToVersion: taskContext.worker?.version,
2558+
queue: options?.queue ? { name: options.queue } : undefined,
2559+
concurrencyKey: options?.concurrencyKey,
2560+
test: taskContext.ctx?.run.isTest,
2561+
payloadType: payloadPacket.dataType,
2562+
delay: options?.delay,
2563+
ttl: options?.ttl,
2564+
tags: options?.tags,
2565+
maxAttempts: options?.maxAttempts,
2566+
metadata: options?.metadata,
2567+
maxDuration: options?.maxDuration,
2568+
parentRunId: ctx.run.id,
2569+
// NOTE: no resumeParentOnCompletion — parent stays alive and subscribes
2570+
idempotencyKey: processedIdempotencyKey?.toString(),
2571+
idempotencyKeyTTL: options?.idempotencyKeyTTL,
2572+
idempotencyKeyOptions,
2573+
machine: options?.machine,
2574+
priority: options?.priority,
2575+
region: options?.region,
2576+
debounce: options?.debounce,
2577+
},
2578+
},
2579+
{}
2580+
);
2581+
2582+
// Set attributes after trigger so the dashboard can link to the child run
2583+
span.setAttribute("messaging.message.id", response.id);
2584+
span.setAttribute("runId", response.id);
2585+
span.setAttribute(SemanticInternalAttributes.ENTITY_TYPE, "run");
2586+
span.setAttribute(SemanticInternalAttributes.ENTITY_ID, response.id);
2587+
2588+
// Optionally cancel the child run when the abort signal fires (default: true)
2589+
const cancelOnAbort = options?.cancelOnAbort !== false;
2590+
if (options?.signal && cancelOnAbort) {
2591+
const onAbort = () => {
2592+
apiClient.cancelRun(response.id).catch(() => {});
2593+
};
2594+
if (options.signal.aborted) {
2595+
await apiClient.cancelRun(response.id).catch(() => {});
2596+
throw new Error("Aborted");
2597+
}
2598+
options.signal.addEventListener("abort", onAbort, { once: true });
2599+
}
2600+
2601+
for await (const run of apiClient.subscribeToRun(response.id, {
2602+
closeOnComplete: true,
2603+
signal: options?.signal,
2604+
skipColumns: ["payload"],
2605+
})) {
2606+
if (run.isSuccess) {
2607+
// run.output from subscribeToRun is already deserialized
2608+
return {
2609+
ok: true as const,
2610+
id: response.id,
2611+
taskIdentifier: id as TIdentifier,
2612+
output: run.output as TOutput,
2613+
};
2614+
}
2615+
if (run.isFailed || run.isCancelled) {
2616+
const error = new Error(run.error?.message ?? `Task ${id} failed (${run.status})`);
2617+
if (run.error?.name) error.name = run.error.name;
2618+
2619+
return {
2620+
ok: false as const,
2621+
id: response.id,
2622+
taskIdentifier: id as TIdentifier,
2623+
error,
2624+
};
2625+
}
2626+
}
2627+
2628+
throw new Error(`Task ${id}: subscription ended without completion`);
2629+
},
2630+
{
2631+
kind: SpanKind.PRODUCER,
2632+
attributes: {
2633+
[SemanticInternalAttributes.STYLE_ICON]: "trigger",
2634+
...accessoryAttributes({
2635+
items: [
2636+
{
2637+
text: id,
2638+
variant: "normal",
2639+
},
2640+
],
2641+
style: "codepath",
2642+
}),
2643+
},
2644+
}
2645+
);
2646+
}
2647+
24422648
async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload, TOutput>(
24432649
name: string,
24442650
id: TIdentifier,

packages/trigger-sdk/src/v3/tasks.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
SubtaskUnwrapError,
2121
trigger,
2222
triggerAndWait,
23+
triggerAndSubscribe,
2324
} from "./shared.js";
2425

2526
export { SubtaskUnwrapError };
@@ -96,6 +97,7 @@ export const tasks = {
9697
trigger,
9798
batchTrigger,
9899
triggerAndWait,
100+
triggerAndSubscribe,
99101
batchTriggerAndWait,
100102
/** @deprecated Use onStartAttempt instead */
101103
onStart,

0 commit comments

Comments
 (0)