Skip to content

Commit b216841

Browse files
committed
use input streams and rename chatTask and chatState to chat.task and chat.state
1 parent 93c5a63 commit b216841

8 files changed

Lines changed: 275 additions & 180 deletions

File tree

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 177 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ import { auth } from "./auth.js";
1515
import { metadata } from "./metadata.js";
1616
import { streams } from "./streams.js";
1717
import { createTask } from "./shared.js";
18-
import { wait } from "./wait.js";
18+
import {
19+
CHAT_STREAM_KEY as _CHAT_STREAM_KEY,
20+
CHAT_MESSAGES_STREAM_ID,
21+
CHAT_STOP_STREAM_ID,
22+
} from "./chat-constants.js";
1923

2024
const METADATA_KEY = "tool.execute.options";
2125

@@ -136,13 +140,13 @@ export const ai = {
136140
* ```ts
137141
* // actions.ts
138142
* "use server";
139-
* import { createChatAccessToken } from "@trigger.dev/sdk/ai";
140-
* import type { chat } from "@/trigger/chat";
143+
* import { chat } from "@trigger.dev/sdk/ai";
144+
* import type { myChat } from "@/trigger/chat";
141145
*
142-
* export const getChatToken = () => createChatAccessToken<typeof chat>("ai-chat");
146+
* export const getChatToken = () => chat.createAccessToken<typeof myChat>("my-chat");
143147
* ```
144148
*/
145-
export async function createChatAccessToken<TTask extends AnyTask>(
149+
function createChatAccessToken<TTask extends AnyTask>(
146150
taskId: TaskIdentifier<TTask>
147151
): Promise<string> {
148152
return auth.createTriggerPublicToken(taskId as string, { multipleUse: true });
@@ -157,7 +161,10 @@ export async function createChatAccessToken<TTask extends AnyTask>(
157161
* Both `TriggerChatTransport` (frontend) and `pipeChat`/`chatTask` (backend)
158162
* use this key by default.
159163
*/
160-
export const CHAT_STREAM_KEY = "chat";
164+
export const CHAT_STREAM_KEY = _CHAT_STREAM_KEY;
165+
166+
// Re-export input stream IDs for advanced usage
167+
export { CHAT_MESSAGES_STREAM_ID, CHAT_STOP_STREAM_ID };
161168

162169
/**
163170
* The payload shape that the chat transport sends to the triggered task.
@@ -187,6 +194,28 @@ export type ChatTaskPayload<TMessage extends UIMessage = UIMessage> = {
187194
metadata?: unknown;
188195
};
189196

197+
/**
198+
* Abort signals provided to the `chatTask` run function.
199+
*/
200+
export type ChatTaskSignals = {
201+
/** Combined signal — fires on run cancel OR stop generation. Pass to `streamText`. */
202+
signal: AbortSignal;
203+
/** Fires only when the run is cancelled, expired, or exceeds maxDuration. */
204+
cancelSignal: AbortSignal;
205+
/** Fires only when the frontend stops generation for this turn (per-turn, reset each turn). */
206+
stopSignal: AbortSignal;
207+
};
208+
209+
/**
210+
* The full payload passed to a `chatTask` run function.
211+
* Extends `ChatTaskPayload` (the wire payload) with abort signals.
212+
*/
213+
export type ChatTaskRunPayload = ChatTaskPayload & ChatTaskSignals;
214+
215+
// Input streams for bidirectional chat communication
216+
const messagesInput = streams.input<ChatTaskPayload>({ id: CHAT_MESSAGES_STREAM_ID });
217+
const stopInput = streams.input<{ stop: true; message?: string }>({ id: CHAT_STOP_STREAM_ID });
218+
190219
/**
191220
* Tracks how many times `pipeChat` has been called in the current `chatTask` run.
192221
* Used to prevent double-piping when a user both calls `pipeChat()` manually
@@ -253,7 +282,7 @@ function isReadableStream(value: unknown): value is ReadableStream<unknown> {
253282
* @example
254283
* ```ts
255284
* import { task } from "@trigger.dev/sdk";
256-
* import { pipeChat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
285+
* import { chat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
257286
* import { streamText, convertToModelMessages } from "ai";
258287
*
259288
* export const myChatTask = task({
@@ -264,7 +293,7 @@ function isReadableStream(value: unknown): value is ReadableStream<unknown> {
264293
* messages: convertToModelMessages(payload.messages),
265294
* });
266295
*
267-
* await pipeChat(result);
296+
* await chat.pipe(result);
268297
* },
269298
* });
270299
* ```
@@ -274,11 +303,11 @@ function isReadableStream(value: unknown): value is ReadableStream<unknown> {
274303
* // Works from anywhere inside a task — even deep in your agent code
275304
* async function runAgentLoop(messages: CoreMessage[]) {
276305
* const result = streamText({ model, messages });
277-
* await pipeChat(result);
306+
* await chat.pipe(result);
278307
* }
279308
* ```
280309
*/
281-
export async function pipeChat(
310+
async function pipeChat(
282311
source: UIMessageStreamable | AsyncIterable<unknown> | ReadableStream<unknown>,
283312
options?: PipeChatOptions
284313
): Promise<void> {
@@ -314,16 +343,15 @@ export async function pipeChat(
314343
* Options for defining a chat task.
315344
*
316345
* Extends the standard `TaskOptions` but pre-types the payload as `ChatTaskPayload`
317-
* and overrides `run` to accept `ChatTaskPayload` directly.
346+
* and overrides `run` to accept `ChatTaskRunPayload` (with abort signals).
318347
*
319348
* **Auto-piping:** If the `run` function returns a value with `.toUIMessageStream()`
320349
* (like a `StreamTextResult`), the stream is automatically piped to the frontend.
321-
* For complex flows, use `pipeChat()` manually from anywhere in your code.
322350
*
323-
* **Single-run mode:** By default, the task runs a waitpoint loop so that the
351+
* **Single-run mode:** By default, the task uses input streams so that the
324352
* entire conversation lives inside one run. After each AI response, the task
325-
* emits a control chunk and pauses via `wait.forToken`. The frontend transport
326-
* resumes the same run by completing the token with the next set of messages.
353+
* emits a control chunk and suspends via `messagesInput.wait()`. The frontend
354+
* transport resumes the same run by sending the next message via input streams.
327355
*/
328356
export type ChatTaskOptions<TIdentifier extends string> = Omit<
329357
TaskOptions<TIdentifier, ChatTaskPayload, unknown>,
@@ -332,13 +360,13 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
332360
/**
333361
* The run function for the chat task.
334362
*
335-
* Receives a `ChatTaskPayload` with the conversation messages, chat session ID,
336-
* and trigger type.
363+
* Receives a `ChatTaskRunPayload` with the conversation messages, chat session ID,
364+
* trigger type, and abort signals (`signal`, `cancelSignal`, `stopSignal`).
337365
*
338366
* **Auto-piping:** If this function returns a value with `.toUIMessageStream()`,
339367
* the stream is automatically piped to the frontend.
340368
*/
341-
run: (payload: ChatTaskPayload) => Promise<unknown>;
369+
run: (payload: ChatTaskRunPayload) => Promise<unknown>;
342370

343371
/**
344372
* Maximum number of conversational turns (message round-trips) a single run
@@ -351,7 +379,7 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
351379

352380
/**
353381
* How long to wait for the next message before timing out and ending the run.
354-
* Accepts any duration string recognised by `wait.createToken` (e.g. `"1h"`, `"30m"`).
382+
* Accepts any duration string (e.g. `"1h"`, `"30m"`).
355383
*
356384
* @default "1h"
357385
*/
@@ -361,87 +389,164 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
361389
/**
362390
* Creates a Trigger.dev task pre-configured for AI SDK chat.
363391
*
364-
* - **Pre-types the payload** as `ChatTaskPayload` — no manual typing needed
392+
* - **Pre-types the payload** as `ChatTaskRunPayload` — includes abort signals
365393
* - **Auto-pipes the stream** if `run` returns a `StreamTextResult`
394+
* - **Multi-turn**: keeps the conversation in a single run using input streams
395+
* - **Stop support**: frontend can stop generation mid-stream via the stop input stream
366396
* - For complex flows, use `pipeChat()` from anywhere inside your task code
367397
*
368398
* @example
369399
* ```ts
370-
* import { chatTask } from "@trigger.dev/sdk/ai";
400+
* import { chat } from "@trigger.dev/sdk/ai";
371401
* import { streamText, convertToModelMessages } from "ai";
372402
* import { openai } from "@ai-sdk/openai";
373403
*
374-
* // Simple: return streamText result — auto-piped to the frontend
375-
* export const myChatTask = chatTask({
376-
* id: "my-chat-task",
377-
* run: async ({ messages }) => {
404+
* export const myChat = chat.task({
405+
* id: "my-chat",
406+
* run: async ({ messages, signal }) => {
378407
* return streamText({
379408
* model: openai("gpt-4o"),
380409
* messages: convertToModelMessages(messages),
410+
* abortSignal: signal, // fires on stop or run cancel
381411
* });
382412
* },
383413
* });
384414
* ```
385-
*
386-
* @example
387-
* ```ts
388-
* import { chatTask, pipeChat } from "@trigger.dev/sdk/ai";
389-
*
390-
* // Complex: pipeChat() from deep in your agent code
391-
* export const myAgentTask = chatTask({
392-
* id: "my-agent-task",
393-
* run: async ({ messages }) => {
394-
* await runComplexAgentLoop(messages);
395-
* },
396-
* });
397-
* ```
398415
*/
399-
export function chatTask<TIdentifier extends string>(
416+
function chatTask<TIdentifier extends string>(
400417
options: ChatTaskOptions<TIdentifier>
401418
): Task<TIdentifier, ChatTaskPayload, unknown> {
402419
const { run: userRun, maxTurns = 100, turnTimeout = "1h", ...restOptions } = options;
403420

404421
return createTask<TIdentifier, ChatTaskPayload, unknown>({
405422
...restOptions,
406-
run: async (payload: ChatTaskPayload) => {
423+
run: async (payload: ChatTaskPayload, { signal: runSignal }) => {
407424
let currentPayload = payload;
408425

409-
for (let turn = 0; turn < maxTurns; turn++) {
410-
_chatPipeCount = 0;
411-
412-
const result = await userRun(currentPayload);
413-
414-
// Auto-pipe if the run function returned a StreamTextResult or similar,
415-
// but only if pipeChat() wasn't already called manually during this turn
416-
if (_chatPipeCount === 0 && isUIMessageStreamable(result)) {
417-
await pipeChat(result);
418-
}
419-
420-
// Create a waitpoint token and emit a control chunk so the frontend
421-
// knows to resume this run instead of triggering a new one.
422-
const token = await wait.createToken({ timeout: turnTimeout });
423-
424-
const { waitUntilComplete } = streams.writer(CHAT_STREAM_KEY, {
425-
execute: ({ write }) => {
426-
write({
427-
type: "__trigger_waitpoint_ready",
428-
tokenId: token.id,
429-
publicAccessToken: token.publicAccessToken,
426+
// Mutable reference to the current turn's stop controller so the
427+
// stop input stream listener (registered once) can abort the right turn.
428+
let currentStopController: AbortController | undefined;
429+
430+
// Listen for stop signals for the lifetime of the run
431+
const stopSub = stopInput.on((data) => {
432+
currentStopController?.abort(data?.message || "stopped");
433+
});
434+
435+
try {
436+
for (let turn = 0; turn < maxTurns; turn++) {
437+
_chatPipeCount = 0;
438+
439+
// Per-turn stop controller (reset each turn)
440+
const stopController = new AbortController();
441+
currentStopController = stopController;
442+
443+
// Three signals for the user's run function
444+
const stopSignal = stopController.signal;
445+
const cancelSignal = runSignal;
446+
const combinedSignal = AbortSignal.any([runSignal, stopController.signal]);
447+
448+
// Buffer messages that arrive during streaming
449+
const pendingMessages: ChatTaskPayload[] = [];
450+
const msgSub = messagesInput.on((msg) => {
451+
pendingMessages.push(msg as ChatTaskPayload);
452+
});
453+
454+
try {
455+
const result = await userRun({
456+
...currentPayload,
457+
signal: combinedSignal,
458+
cancelSignal,
459+
stopSignal,
430460
});
431-
},
432-
});
433-
await waitUntilComplete();
434461

435-
// Pause until the frontend completes the token with the next message
436-
const next = await wait.forToken<ChatTaskPayload>(token);
437-
438-
if (!next.ok) {
439-
// Timed out waiting for the next message — end the conversation
440-
return;
462+
// Auto-pipe if the run function returned a StreamTextResult or similar,
463+
// but only if pipeChat() wasn't already called manually during this turn
464+
if (_chatPipeCount === 0 && isUIMessageStreamable(result)) {
465+
await pipeChat(result, { signal: combinedSignal });
466+
}
467+
} catch (error) {
468+
// Handle AbortError from streamText gracefully
469+
if (error instanceof Error && error.name === "AbortError") {
470+
if (runSignal.aborted) {
471+
return; // Full run cancellation — exit
472+
}
473+
// Stop generation — fall through to continue the loop
474+
} else {
475+
throw error;
476+
}
477+
} finally {
478+
msgSub.off();
479+
}
480+
481+
if (runSignal.aborted) return;
482+
483+
// Write turn-complete control chunk so frontend closes its stream
484+
await writeTurnCompleteChunk();
485+
486+
// If messages arrived during streaming, use the first one immediately
487+
if (pendingMessages.length > 0) {
488+
currentPayload = pendingMessages[0]!;
489+
continue;
490+
}
491+
492+
// Suspend the task (frees compute) until the next message arrives
493+
const next = await messagesInput.wait({ timeout: turnTimeout });
494+
495+
if (!next.ok) {
496+
// Timed out waiting for the next message — end the conversation
497+
return;
498+
}
499+
500+
currentPayload = next.output as ChatTaskPayload;
441501
}
442-
443-
currentPayload = next.output;
502+
} finally {
503+
stopSub.off();
444504
}
445505
},
446506
});
447507
}
508+
509+
/**
510+
* Namespace for AI SDK chat integration.
511+
*
512+
* @example
513+
* ```ts
514+
* import { chat } from "@trigger.dev/sdk/ai";
515+
*
516+
* // Define a chat task
517+
* export const myChat = chat.task({
518+
* id: "my-chat",
519+
* run: async ({ messages, signal }) => {
520+
* return streamText({ model, messages, abortSignal: signal });
521+
* },
522+
* });
523+
*
524+
* // Pipe a stream manually (from inside a task)
525+
* await chat.pipe(streamTextResult);
526+
*
527+
* // Create an access token (from a server action)
528+
* const token = await chat.createAccessToken("my-chat");
529+
* ```
530+
*/
531+
export const chat = {
532+
/** Create a chat task. See {@link chatTask}. */
533+
task: chatTask,
534+
/** Pipe a stream to the chat transport. See {@link pipeChat}. */
535+
pipe: pipeChat,
536+
/** Create a public access token for a chat task. See {@link createChatAccessToken}. */
537+
createAccessToken: createChatAccessToken,
538+
};
539+
540+
/**
541+
* Writes a turn-complete control chunk to the chat output stream.
542+
* The frontend transport intercepts this to close the ReadableStream for the current turn.
543+
* @internal
544+
*/
545+
async function writeTurnCompleteChunk(): Promise<void> {
546+
const { waitUntilComplete } = streams.writer(CHAT_STREAM_KEY, {
547+
execute: ({ write }) => {
548+
write({ type: "__trigger_turn_complete" });
549+
},
550+
});
551+
await waitUntilComplete();
552+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/**
2+
* Stream IDs used for bidirectional chat communication.
3+
* Shared between backend (ai.ts) and frontend (chat.ts).
4+
*/
5+
6+
/** The output stream key where UIMessageChunks are written. */
7+
export const CHAT_STREAM_KEY = "chat";
8+
9+
/** Input stream ID for sending chat messages to the running task. */
10+
export const CHAT_MESSAGES_STREAM_ID = "chat-messages";
11+
12+
/** Input stream ID for sending stop signals to abort the current generation. */
13+
export const CHAT_STOP_STREAM_ID = "chat-stop";

packages/trigger-sdk/src/v3/chat-react.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export type UseTriggerChatTransportOptions<TTask extends AnyTask = AnyTask> = Om
5151
*
5252
* The transport is created once on first render and reused for the lifetime
5353
* of the component. This avoids the need for `useMemo` and ensures the
54-
* transport's internal session state (waitpoint tokens, lastEventId, etc.)
54+
* transport's internal session state (run IDs, lastEventId, etc.)
5555
* is preserved across re-renders.
5656
*
5757
* For dynamic access tokens, pass a function — it will be called on each

0 commit comments

Comments
 (0)