Skip to content

Commit d8deb44

Browse files
committed
feat(ai): add triggerAndSubscribe method and use it in ai.tool
Replace triggerAndWait with triggerAndSubscribe in ai.tool to fix: - Parallel tool calls (no more preventMultipleWaits errors) - Stop signal while suspended (parent stays alive, child gets cancelled) New task.triggerAndSubscribe() method: trigger + subscribeToRun in a single span, with abort signal support and cancelOnAbort option. Convert deepResearch to a schemaTask + ai.tool in the reference app. refs TRI-7986
1 parent 112fda7 commit d8deb44

7 files changed

Lines changed: 544 additions & 10 deletions

File tree

packages/core/src/v3/types/tasks.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,30 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
641641
requestOptions?: TriggerApiRequestOptions
642642
) => TaskRunPromise<TIdentifier, TOutput>;
643643

644+
/**
645+
* Trigger a task and subscribe to its updates via realtime. Unlike `triggerAndWait`,
646+
* this does NOT suspend the parent run — the parent stays alive and polls for updates.
647+
* This enables parallel tool calls and proper abort signal handling.
648+
*
649+
* @param payload
650+
* @param options - Options for the task run, including an optional `signal` to cancel the subscription and child run
651+
* @returns TaskRunPromise
652+
* @example
653+
* ```
654+
* const result = await task.triggerAndSubscribe({ foo: "bar" }, { signal: abortSignal });
655+
*
656+
* if (result.ok) {
657+
* console.log(result.output);
658+
* } else {
659+
* console.error(result.error);
660+
* }
661+
* ```
662+
*/
663+
triggerAndSubscribe: (
664+
payload: TInput,
665+
options?: TriggerAndSubscribeOptions,
666+
) => TaskRunPromise<TIdentifier, TOutput>;
667+
644668
/**
645669
* Batch trigger multiple task runs with the given payloads, and wait for the results. Returns the results of the task runs.
646670
* @param items - Array, AsyncIterable, or ReadableStream of batch items
@@ -989,6 +1013,16 @@ export type TriggerOptions = {
9891013
};
9901014

9911015
export type TriggerAndWaitOptions = Omit<TriggerOptions, "version">;
1016+
1017+
export type TriggerAndSubscribeOptions = Omit<TriggerOptions, "version"> & {
1018+
/** An AbortSignal to cancel the subscription. When fired, the subscription closes and the promise rejects. */
1019+
signal?: AbortSignal;
1020+
/**
1021+
* Whether to cancel the child run when the abort signal fires.
1022+
* @default true
1023+
*/
1024+
cancelOnAbort?: boolean;
1025+
};
9921026
export type BatchTriggerOptions = {
9931027
/**
9941028
* If no idempotencyKey is set on an individual item in the batch, it will use this key on each item + the array index.

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,12 @@ function toolFromTask<
147147
}
148148

149149
return await task
150-
.triggerAndWait(input as inferSchemaIn<TTaskSchema>, {
150+
.triggerAndSubscribe(input as inferSchemaIn<TTaskSchema>, {
151151
metadata: {
152152
[METADATA_KEY]: toolMeta as any,
153153
},
154154
tags: options?.toolCallId ? [`toolCallId:${options.toolCallId}`] : undefined,
155+
signal: options?.abortSignal,
155156
})
156157
.unwrap();
157158
},

packages/trigger-sdk/src/v3/runs.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,14 @@ export type SubscribeToRunOptions = {
358358
* ```
359359
*/
360360
skipColumns?: RealtimeRunSkipColumns;
361+
362+
/**
363+
* An AbortSignal to cancel the subscription.
364+
*
365+
* When the signal is aborted, the underlying SSE connection is closed
366+
* and the async iterator completes.
367+
*/
368+
signal?: AbortSignal;
361369
};
362370

363371
/**
@@ -403,6 +411,7 @@ function subscribeToRun<TRunId extends AnyRunHandle | AnyTask | string>(
403411
closeOnComplete:
404412
typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true,
405413
skipColumns: options?.skipColumns,
414+
signal: options?.signal,
406415
});
407416
}
408417

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ import type {
9090
TaskWithToolOptions,
9191
ToolTask,
9292
ToolTaskParameters,
93+
TriggerAndSubscribeOptions,
9394
TriggerAndWaitOptions,
9495
TriggerApiRequestOptions,
9596
TriggerOptions,
@@ -214,6 +215,26 @@ export function createTask<
214215
});
215216
}, params.id);
216217
},
218+
triggerAndSubscribe: (payload, options) => {
219+
return new TaskRunPromise<TIdentifier, TOutput>((resolve, reject) => {
220+
triggerAndSubscribe_internal<TIdentifier, TInput, TOutput>(
221+
"triggerAndSubscribe()",
222+
params.id,
223+
payload,
224+
undefined,
225+
{
226+
queue: params.queue?.name,
227+
...options,
228+
}
229+
)
230+
.then((result) => {
231+
resolve(result);
232+
})
233+
.catch((error) => {
234+
reject(error);
235+
});
236+
}, params.id);
237+
},
217238
batchTriggerAndWait: async (items, options) => {
218239
return await batchTriggerAndWait_internal<TIdentifier, TInput, TOutput>(
219240
"batchTriggerAndWait()",
@@ -346,6 +367,26 @@ export function createSchemaTask<
346367
});
347368
}, params.id);
348369
},
370+
triggerAndSubscribe: (payload, options) => {
371+
return new TaskRunPromise<TIdentifier, TOutput>((resolve, reject) => {
372+
triggerAndSubscribe_internal<TIdentifier, inferSchemaIn<TSchema>, TOutput>(
373+
"triggerAndSubscribe()",
374+
params.id,
375+
payload,
376+
parsePayload,
377+
{
378+
queue: params.queue?.name,
379+
...options,
380+
}
381+
)
382+
.then((result) => {
383+
resolve(result);
384+
})
385+
.catch((error) => {
386+
reject(error);
387+
});
388+
}, params.id);
389+
},
349390
batchTriggerAndWait: async (items, options) => {
350391
return await batchTriggerAndWait_internal<TIdentifier, inferSchemaIn<TSchema>, TOutput>(
351392
"batchTriggerAndWait()",
@@ -465,6 +506,49 @@ export function triggerAndWait<TTask extends AnyTask>(
465506
}, id);
466507
}
467508

509+
/**
510+
* Trigger a task and subscribe to its updates via realtime. Unlike `triggerAndWait`,
511+
* this does NOT suspend the parent run — the parent stays alive and subscribes to updates.
512+
* This enables parallel execution and proper abort signal handling.
513+
*
514+
* @param id - The id of the task to trigger
515+
* @param payload
516+
* @param options - Options for the task run, including an optional `signal` to cancel the subscription and child run
517+
* @returns TaskRunPromise
518+
* @example
519+
* ```ts
520+
* import { tasks } from "@trigger.dev/sdk/v3";
521+
* const result = await tasks.triggerAndSubscribe("my-task", { foo: "bar" });
522+
*
523+
* if (result.ok) {
524+
* console.log(result.output);
525+
* } else {
526+
* console.error(result.error);
527+
* }
528+
* ```
529+
*/
530+
export function triggerAndSubscribe<TTask extends AnyTask>(
531+
id: TaskIdentifier<TTask>,
532+
payload: TaskPayload<TTask>,
533+
options?: TriggerAndSubscribeOptions
534+
): TaskRunPromise<TaskIdentifier<TTask>, TaskOutput<TTask>> {
535+
return new TaskRunPromise<TaskIdentifier<TTask>, TaskOutput<TTask>>((resolve, reject) => {
536+
triggerAndSubscribe_internal<TaskIdentifier<TTask>, TaskPayload<TTask>, TaskOutput<TTask>>(
537+
"tasks.triggerAndSubscribe()",
538+
id,
539+
payload,
540+
undefined,
541+
options
542+
)
543+
.then((result) => {
544+
resolve(result);
545+
})
546+
.catch((error) => {
547+
reject(error);
548+
});
549+
}, id);
550+
}
551+
468552
/**
469553
* Batch trigger multiple task runs with the given payloads, and wait for the results. Returns the results of the task runs.
470554
* @param id - The id of the task to trigger
@@ -2441,6 +2525,128 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
24412525
);
24422526
}
24432527

2528+
async function triggerAndSubscribe_internal<TIdentifier extends string, TPayload, TOutput>(
2529+
name: string,
2530+
id: TIdentifier,
2531+
payload: TPayload,
2532+
parsePayload?: SchemaParseFn<TPayload>,
2533+
options?: TriggerAndSubscribeOptions
2534+
): Promise<TaskRunResult<TIdentifier, TOutput>> {
2535+
const ctx = taskContext.ctx;
2536+
2537+
if (!ctx) {
2538+
throw new Error("triggerAndSubscribe can only be used from inside a task.run()");
2539+
}
2540+
2541+
const apiClient = apiClientManager.clientOrThrow();
2542+
2543+
const parsedPayload = parsePayload ? await parsePayload(payload) : payload;
2544+
const payloadPacket = await stringifyIO(parsedPayload);
2545+
2546+
const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey);
2547+
const idempotencyKeyOptions = processedIdempotencyKey
2548+
? getIdempotencyKeyOptions(processedIdempotencyKey)
2549+
: undefined;
2550+
2551+
return await tracer.startActiveSpan(
2552+
name,
2553+
async (span) => {
2554+
const response = await apiClient.triggerTask(
2555+
id,
2556+
{
2557+
payload: payloadPacket.data,
2558+
options: {
2559+
lockToVersion: taskContext.worker?.version,
2560+
queue: options?.queue ? { name: options.queue } : undefined,
2561+
concurrencyKey: options?.concurrencyKey,
2562+
test: taskContext.ctx?.run.isTest,
2563+
payloadType: payloadPacket.dataType,
2564+
delay: options?.delay,
2565+
ttl: options?.ttl,
2566+
tags: options?.tags,
2567+
maxAttempts: options?.maxAttempts,
2568+
metadata: options?.metadata,
2569+
maxDuration: options?.maxDuration,
2570+
parentRunId: ctx.run.id,
2571+
// NOTE: no resumeParentOnCompletion — parent stays alive and subscribes
2572+
idempotencyKey: processedIdempotencyKey?.toString(),
2573+
idempotencyKeyTTL: options?.idempotencyKeyTTL,
2574+
idempotencyKeyOptions,
2575+
machine: options?.machine,
2576+
priority: options?.priority,
2577+
region: options?.region,
2578+
debounce: options?.debounce,
2579+
},
2580+
},
2581+
{}
2582+
);
2583+
2584+
// Set attributes after trigger so the dashboard can link to the child run
2585+
span.setAttribute("messaging.message.id", response.id);
2586+
span.setAttribute("runId", response.id);
2587+
span.setAttribute(SemanticInternalAttributes.ENTITY_TYPE, "run");
2588+
span.setAttribute(SemanticInternalAttributes.ENTITY_ID, response.id);
2589+
2590+
// Optionally cancel the child run when the abort signal fires (default: true)
2591+
const cancelOnAbort = options?.cancelOnAbort !== false;
2592+
if (options?.signal && cancelOnAbort) {
2593+
const onAbort = () => {
2594+
apiClient.cancelRun(response.id).catch(() => {});
2595+
};
2596+
if (options.signal.aborted) {
2597+
await apiClient.cancelRun(response.id).catch(() => {});
2598+
throw new Error("Aborted");
2599+
}
2600+
options.signal.addEventListener("abort", onAbort, { once: true });
2601+
}
2602+
2603+
for await (const run of apiClient.subscribeToRun(response.id, {
2604+
closeOnComplete: true,
2605+
signal: options?.signal,
2606+
skipColumns: ["payload"],
2607+
})) {
2608+
if (run.isSuccess) {
2609+
// run.output from subscribeToRun is already deserialized
2610+
return {
2611+
ok: true as const,
2612+
id: response.id,
2613+
taskIdentifier: id as TIdentifier,
2614+
output: run.output as TOutput,
2615+
};
2616+
}
2617+
if (run.isFailed || run.isCancelled) {
2618+
const error = new Error(run.error?.message ?? `Task ${id} failed (${run.status})`);
2619+
if (run.error?.name) error.name = run.error.name;
2620+
2621+
return {
2622+
ok: false as const,
2623+
id: response.id,
2624+
taskIdentifier: id as TIdentifier,
2625+
error,
2626+
};
2627+
}
2628+
}
2629+
2630+
throw new Error(`Task ${id}: subscription ended without completion`);
2631+
},
2632+
{
2633+
kind: SpanKind.PRODUCER,
2634+
attributes: {
2635+
[SemanticInternalAttributes.STYLE_ICON]: "trigger",
2636+
...accessoryAttributes({
2637+
items: [
2638+
{
2639+
text: id,
2640+
variant: "normal",
2641+
},
2642+
],
2643+
style: "codepath",
2644+
}),
2645+
},
2646+
}
2647+
);
2648+
}
2649+
24442650
async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload, TOutput>(
24452651
name: string,
24462652
id: TIdentifier,

packages/trigger-sdk/src/v3/tasks.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
SubtaskUnwrapError,
2121
trigger,
2222
triggerAndWait,
23+
triggerAndSubscribe,
2324
} from "./shared.js";
2425

2526
export { SubtaskUnwrapError };
@@ -96,6 +97,7 @@ export const tasks = {
9697
trigger,
9798
batchTrigger,
9899
triggerAndWait,
100+
triggerAndSubscribe,
99101
batchTriggerAndWait,
100102
/** @deprecated Use onStartAttempt instead */
101103
onStart,

0 commit comments

Comments
 (0)