Skip to content

Commit bf924bc

Browse files
committed
feat(chat): add stop handling, abort cleanup, continuation support, and reference project enhancements
- Fix onFinish race condition: await onFinishPromise so capturedResponseMessage is set before accumulation - Add chat.isStopped() helper accessible from anywhere during a turn - Add chat.cleanupAbortedParts() to remove incomplete tool/reasoning/text parts on stop - Auto-cleanup aborted parts before passing to onTurnComplete - Clean incoming messages from frontend to prevent tool_use without tool_result API errors - Add stopped and rawResponseMessage fields to TurnCompleteEvent - Add continuation and previousRunId fields to all lifecycle hooks and run payload - Add span attributes (chat.id, chat.turn, chat.stopped, chat.continuation, chat.previous_run_id, etc.) - Add webFetch tool and reasoning model support to ai-chat reference project - Render reasoning parts in frontend chat component - Document all new fields in ai-chat guide
1 parent 5133699 commit bf924bc

5 files changed

Lines changed: 289 additions & 19 deletions

File tree

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 203 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ type ChatTaskWirePayload<TMessage extends UIMessage = UIMessage, TMetadata = unk
186186
trigger: "submit-message" | "regenerate-message";
187187
messageId?: string;
188188
metadata?: TMetadata;
189+
/** Whether this run is continuing an existing chat whose previous run ended. */
190+
continuation?: boolean;
191+
/** The run ID of the previous run (only set when `continuation` is true). */
192+
previousRunId?: string;
189193
};
190194

191195
/**
@@ -217,6 +221,11 @@ export type ChatTaskPayload<TClientData = unknown> = {
217221

218222
/** Custom data from the frontend (passed via `metadata` on `sendMessage()` or the transport). */
219223
clientData?: TClientData;
224+
225+
/** Whether this run is continuing an existing chat (previous run timed out or was cancelled). False for brand new chats. */
226+
continuation: boolean;
227+
/** The run ID of the previous run (only set when `continuation` is true). */
228+
previousRunId?: string;
220229
};
221230

222231
/**
@@ -247,6 +256,7 @@ const stopInput = streams.input<{ stop: true; message?: string }>({ id: CHAT_STO
247256
* @internal
248257
*/
249258
const chatPipeCountKey = locals.create<number>("chat.pipeCount");
259+
const chatStopControllerKey = locals.create<AbortController>("chat.stopController");
250260

251261
/**
252262
* Options for `pipeChat`.
@@ -397,6 +407,10 @@ export type ChatStartEvent<TClientData = unknown> = {
397407
runId: string;
398408
/** A scoped access token for this chat run. Persist this for frontend reconnection. */
399409
chatAccessToken: string;
410+
/** Whether this run is continuing an existing chat (previous run timed out or was cancelled). False for brand new chats. */
411+
continuation: boolean;
412+
/** The run ID of the previous run (only set when `continuation` is true). */
413+
previousRunId?: string;
400414
};
401415

402416
/**
@@ -417,6 +431,10 @@ export type TurnStartEvent<TClientData = unknown> = {
417431
chatAccessToken: string;
418432
/** Custom data from the frontend. */
419433
clientData?: TClientData;
434+
/** Whether this run is continuing an existing chat (previous run timed out or was cancelled). False for brand new chats. */
435+
continuation: boolean;
436+
/** The run ID of the previous run (only set when `continuation` is true). */
437+
previousRunId?: string;
420438
};
421439

422440
/**
@@ -442,8 +460,14 @@ export type TurnCompleteEvent<TClientData = unknown> = {
442460
* Useful for inserting individual message records instead of overwriting the full history.
443461
*/
444462
newUIMessages: UIMessage[];
445-
/** The assistant's response for this turn (undefined if `pipeChat` was used manually). */
463+
/** The assistant's response for this turn, with aborted parts cleaned up when `stopped` is true. Undefined if `pipeChat` was used manually. */
446464
responseMessage: UIMessage | undefined;
465+
/**
466+
* The raw assistant response before abort cleanup. Includes incomplete tool parts
467+
* (`input-available`, `partial-call`) and streaming reasoning/text parts.
468+
* Use this if you need custom cleanup logic. Same as `responseMessage` when not stopped.
469+
*/
470+
rawResponseMessage: UIMessage | undefined;
447471
/** The turn number (0-indexed). */
448472
turn: number;
449473
/** The Trigger.dev run ID for this conversation. */
@@ -454,6 +478,12 @@ export type TurnCompleteEvent<TClientData = unknown> = {
454478
lastEventId?: string;
455479
/** Custom data from the frontend. */
456480
clientData?: TClientData;
481+
/** Whether the user stopped generation during this turn. */
482+
stopped: boolean;
483+
/** Whether this run is continuing an existing chat (previous run timed out or was cancelled). False for brand new chats. */
484+
continuation: boolean;
485+
/** The run ID of the previous run (only set when `continuation` is true). */
486+
previousRunId?: string;
457487
};
458488

459489
export type ChatTaskOptions<
@@ -637,6 +667,8 @@ function chatTask<
637667
}
638668

639669
let currentWirePayload = payload;
670+
const continuation = payload.continuation ?? false;
671+
const previousRunId = payload.previousRunId;
640672

641673
// Accumulated model messages across turns. Turn 1 initialises from the
642674
// full history the frontend sends; subsequent turns append only the new
@@ -704,6 +736,7 @@ function chatTask<
704736
// Per-turn stop controller (reset each turn)
705737
const stopController = new AbortController();
706738
currentStopController = stopController;
739+
locals.set(chatStopControllerKey, stopController);
707740

708741
// Three signals for the user's run function
709742
const stopSignal = stopController.signal;
@@ -716,37 +749,46 @@ function chatTask<
716749
pendingMessages.push(msg);
717750
});
718751

752+
// Clean up any incomplete tool parts in the incoming history.
753+
// When a previous run was stopped mid-tool-call, the frontend's
754+
// useChat state may still contain assistant messages with tool parts
755+
// in partial/input-available state. These cause API errors (e.g.
756+
// Anthropic requires every tool_use to have a matching tool_result).
757+
const cleanedUIMessages = uiMessages.map((msg) =>
758+
msg.role === "assistant" ? cleanupAbortedParts(msg) : msg
759+
);
760+
719761
// Convert the incoming UIMessages to model messages and update the accumulator.
720762
// Turn 1: full history from the frontend → replaces the accumulator.
721763
// Turn 2+: only the new message(s) → appended to the accumulator.
722-
const incomingModelMessages = await convertToModelMessages(uiMessages);
764+
const incomingModelMessages = await convertToModelMessages(cleanedUIMessages);
723765

724766
// Track new messages for this turn (user input + assistant response).
725767
const turnNewModelMessages: ModelMessage[] = [];
726768
const turnNewUIMessages: UIMessage[] = [];
727769

728770
if (turn === 0) {
729771
accumulatedMessages = incomingModelMessages;
730-
accumulatedUIMessages = [...uiMessages];
772+
accumulatedUIMessages = [...cleanedUIMessages];
731773
// On first turn, the "new" messages are just the last user message
732774
// (the rest is history). We'll add the response after streaming.
733-
if (uiMessages.length > 0) {
734-
turnNewUIMessages.push(uiMessages[uiMessages.length - 1]!);
775+
if (cleanedUIMessages.length > 0) {
776+
turnNewUIMessages.push(cleanedUIMessages[cleanedUIMessages.length - 1]!);
735777
const lastModel = incomingModelMessages[incomingModelMessages.length - 1];
736778
if (lastModel) turnNewModelMessages.push(lastModel);
737779
}
738780
} else if (currentWirePayload.trigger === "regenerate-message") {
739781
// Regenerate: frontend sent full history with last assistant message
740782
// removed. Reset the accumulator to match.
741783
accumulatedMessages = incomingModelMessages;
742-
accumulatedUIMessages = [...uiMessages];
784+
accumulatedUIMessages = [...cleanedUIMessages];
743785
// No new user messages for regenerate — just the response (added below)
744786
} else {
745787
// Submit: frontend sent only the new user message(s). Append to accumulator.
746788
accumulatedMessages.push(...incomingModelMessages);
747-
accumulatedUIMessages.push(...uiMessages);
789+
accumulatedUIMessages.push(...cleanedUIMessages);
748790
turnNewModelMessages.push(...incomingModelMessages);
749-
turnNewUIMessages.push(...uiMessages);
791+
turnNewUIMessages.push(...cleanedUIMessages);
750792
}
751793

752794
// Mint a scoped public access token once per turn, reused for
@@ -778,12 +820,18 @@ function chatTask<
778820
clientData,
779821
runId: currentRunId,
780822
chatAccessToken: turnAccessToken,
823+
continuation,
824+
previousRunId,
781825
});
782826
},
783827
{
784828
attributes: {
785829
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onStart",
786830
[SemanticInternalAttributes.COLLAPSED]: true,
831+
"chat.id": currentWirePayload.chatId,
832+
"chat.messages.count": accumulatedMessages.length,
833+
"chat.continuation": continuation,
834+
...(previousRunId ? { "chat.previous_run_id": previousRunId } : {}),
787835
},
788836
}
789837
);
@@ -803,12 +851,20 @@ function chatTask<
803851
runId: currentRunId,
804852
chatAccessToken: turnAccessToken,
805853
clientData,
854+
continuation,
855+
previousRunId,
806856
});
807857
},
808858
{
809859
attributes: {
810860
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onStart",
811861
[SemanticInternalAttributes.COLLAPSED]: true,
862+
"chat.id": currentWirePayload.chatId,
863+
"chat.turn": turn + 1,
864+
"chat.messages.count": accumulatedMessages.length,
865+
"chat.trigger": currentWirePayload.trigger,
866+
"chat.continuation": continuation,
867+
...(previousRunId ? { "chat.previous_run_id": previousRunId } : {}),
812868
},
813869
}
814870
);
@@ -817,11 +873,22 @@ function chatTask<
817873
// Captured by the onFinish callback below — works even on abort/stop.
818874
let capturedResponseMessage: UIMessage | undefined;
819875

876+
// Promise that resolves when the AI SDK's onFinish fires.
877+
// On abort, the stream's cancel() handler calls onFinish
878+
// asynchronously AFTER pipeChat resolves, so we must await
879+
// this to avoid a race where we check capturedResponseMessage
880+
// before it's been set.
881+
let resolveOnFinish: () => void;
882+
const onFinishPromise = new Promise<void>((r) => { resolveOnFinish = r; });
883+
let onFinishAttached = false;
884+
820885
try {
821886
const result = await userRun({
822887
...restWire,
823888
messages: accumulatedMessages,
824889
clientData,
890+
continuation,
891+
previousRunId,
825892
signal: combinedSignal,
826893
cancelSignal,
827894
stopSignal,
@@ -831,9 +898,11 @@ function chatTask<
831898
// but only if pipeChat() wasn't already called manually during this turn.
832899
// We call toUIMessageStream ourselves to attach onFinish for response capture.
833900
if ((locals.get(chatPipeCountKey) ?? 0) === 0 && isUIMessageStreamable(result)) {
901+
onFinishAttached = true;
834902
const uiStream = result.toUIMessageStream({
835903
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
836904
capturedResponseMessage = responseMessage;
905+
resolveOnFinish!();
837906
},
838907
});
839908
await pipeChat(uiStream, { signal: combinedSignal, spanName: "stream response" });
@@ -852,10 +921,26 @@ function chatTask<
852921
msgSub.off();
853922
}
854923

924+
// Wait for onFinish to fire — on abort this may resolve slightly
925+
// after pipeChat, since the stream's cancel() handler is async.
926+
if (onFinishAttached) {
927+
await onFinishPromise;
928+
}
929+
930+
// Determine if the user stopped generation this turn (not a full run cancel).
931+
const wasStopped = stopController.signal.aborted && !runSignal.aborted;
932+
855933
// Append the assistant's response (partial or complete) to the accumulator.
856934
// The onFinish callback fires even on abort/stop, so partial responses
857935
// from stopped generation are captured correctly.
936+
let rawResponseMessage: UIMessage | undefined;
858937
if (capturedResponseMessage) {
938+
// Keep the raw message before cleanup for users who want custom handling
939+
rawResponseMessage = capturedResponseMessage;
940+
// Clean up aborted parts (streaming tool calls, reasoning) when stopped
941+
if (wasStopped) {
942+
capturedResponseMessage = cleanupAbortedParts(capturedResponseMessage);
943+
}
859944
// Ensure the response message has an ID (the stream's onFinish
860945
// may produce a message with an empty ID since IDs are normally
861946
// assigned by the frontend's useChat).
@@ -900,17 +985,29 @@ function chatTask<
900985
newMessages: turnNewModelMessages,
901986
newUIMessages: turnNewUIMessages,
902987
responseMessage: capturedResponseMessage,
988+
rawResponseMessage,
903989
turn,
904990
runId: currentRunId,
905991
chatAccessToken: turnAccessToken,
906992
lastEventId: turnCompleteResult.lastEventId,
907993
clientData,
994+
stopped: wasStopped,
995+
continuation,
996+
previousRunId,
908997
});
909998
},
910999
{
9111000
attributes: {
9121001
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onComplete",
9131002
[SemanticInternalAttributes.COLLAPSED]: true,
1003+
"chat.id": currentWirePayload.chatId,
1004+
"chat.turn": turn + 1,
1005+
"chat.stopped": wasStopped,
1006+
"chat.continuation": continuation,
1007+
...(previousRunId ? { "chat.previous_run_id": previousRunId } : {}),
1008+
"chat.messages.count": accumulatedMessages.length,
1009+
"chat.response.parts.count": capturedResponseMessage?.parts?.length ?? 0,
1010+
"chat.new_messages.count": turnNewUIMessages.length,
9141011
},
9151012
}
9161013
);
@@ -1062,6 +1159,100 @@ function setWarmTimeoutInSeconds(seconds: number): void {
10621159
metadata.set(WARM_TIMEOUT_METADATA_KEY, seconds);
10631160
}
10641161

1162+
// ---------------------------------------------------------------------------
1163+
// Stop detection
1164+
// ---------------------------------------------------------------------------
1165+
1166+
/**
1167+
* Check whether the user stopped generation during the current turn.
1168+
*
1169+
* Works from **anywhere** inside a `chat.task` run — including inside
1170+
* `streamText`'s `onFinish` callback — without needing to thread the
1171+
* `stopSignal` through closures.
1172+
*
1173+
* This is especially useful when the AI SDK's `isAborted` flag is unreliable
1174+
* (e.g. when using `createUIMessageStream` + `writer.merge()`).
1175+
*
1176+
* @example
1177+
* ```ts
1178+
* onFinish: ({ isAborted }) => {
1179+
* const wasStopped = isAborted || chat.isStopped();
1180+
* if (wasStopped) {
1181+
* // handle stop
1182+
* }
1183+
* }
1184+
* ```
1185+
*/
1186+
function isStopped(): boolean {
1187+
const controller = locals.get(chatStopControllerKey);
1188+
return controller?.signal.aborted ?? false;
1189+
}
1190+
1191+
// ---------------------------------------------------------------------------
1192+
// Aborted message cleanup
1193+
// ---------------------------------------------------------------------------
1194+
1195+
/**
1196+
* Clean up a UIMessage that was captured during an aborted/stopped turn.
1197+
*
1198+
* When generation is stopped mid-stream, the captured message may contain:
1199+
* - Tool parts stuck in incomplete states (`partial-call`, `input-available`,
1200+
* `input-streaming`) that cause permanent UI spinners
1201+
* - Reasoning parts with `state: "streaming"` instead of `"done"`
1202+
* - Text parts with `state: "streaming"` instead of `"done"`
1203+
*
1204+
* This function returns a cleaned copy with:
1205+
* - Incomplete tool parts removed entirely
1206+
* - Reasoning and text parts marked as `"done"`
1207+
*
1208+
* `chat.task` calls this automatically when stop is detected before passing
1209+
* the response to `onTurnComplete`. Use this manually when calling `pipeChat`
1210+
* directly and capturing response messages yourself.
1211+
*
1212+
* @example
1213+
* ```ts
1214+
* onTurnComplete: async ({ responseMessage, stopped }) => {
1215+
* // Already cleaned automatically by chat.task — but if you captured
1216+
* // your own message via pipeChat, clean it manually:
1217+
* const cleaned = chat.cleanupAbortedParts(myMessage);
1218+
* await db.messages.save(cleaned);
1219+
* }
1220+
* ```
1221+
*/
1222+
function cleanupAbortedParts(message: UIMessage): UIMessage {
1223+
if (!message.parts) return message;
1224+
1225+
const isToolPart = (part: any) =>
1226+
part.type === "tool-invocation" ||
1227+
part.type?.startsWith("tool-") ||
1228+
part.type === "dynamic-tool";
1229+
1230+
return {
1231+
...message,
1232+
parts: message.parts
1233+
.filter((part: any) => {
1234+
if (!isToolPart(part)) return true;
1235+
// Remove tool parts that never completed execution.
1236+
// partial-call: input was still streaming when aborted.
1237+
// input-available: input was complete but tool never ran.
1238+
// input-streaming: input was mid-stream.
1239+
const state = part.toolInvocation?.state ?? part.state;
1240+
return state !== "partial-call" && state !== "input-available" && state !== "input-streaming";
1241+
})
1242+
.map((part: any) => {
1243+
// Mark streaming reasoning as done
1244+
if (part.type === "reasoning" && part.state === "streaming") {
1245+
return { ...part, state: "done" };
1246+
}
1247+
// Mark streaming text as done
1248+
if (part.type === "text" && part.state === "streaming") {
1249+
return { ...part, state: "done" };
1250+
}
1251+
return part;
1252+
}),
1253+
};
1254+
}
1255+
10651256
// ---------------------------------------------------------------------------
10661257
// chat.local — per-run typed data with Proxy access
10671258
// ---------------------------------------------------------------------------
@@ -1257,6 +1448,10 @@ export const chat = {
12571448
setTurnTimeoutInSeconds,
12581449
/** Override the warm timeout at runtime. See {@link setWarmTimeoutInSeconds}. */
12591450
setWarmTimeoutInSeconds,
1451+
/** Check if the current turn was stopped by the user. See {@link isStopped}. */
1452+
isStopped,
1453+
/** Clean up aborted parts from a UIMessage. See {@link cleanupAbortedParts}. */
1454+
cleanupAbortedParts,
12601455
};
12611456

12621457
/**

0 commit comments

Comments
 (0)