Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/custom-agent-loop-fixes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@trigger.dev/sdk": patch
---

Three fixes for custom agent loops (`chat.customAgent`, `chat.createSession`, and hand-rolled `MessageAccumulator` loops):

- Continuation runs no longer replay already-answered user messages into the first turn. The `.in` resume cursor is now seeded before any listener attaches (the same boot logic `chat.agent` uses), so a chat that continues after a cancel, crash, or upgrade only sees genuinely new messages.
- Steering a hand-rolled loop mid-stream no longer wipes the in-flight assistant response. `chat.pipeAndCapture` now stamps a server-generated message id on the stream, so a `prepareStep` injection keeps the partial text instead of replacing the message.
- Task-backed tools (`ai.toolExecute`) now work from custom agent loops: the parent's session is threaded to the child run, so child tasks can stream progress into the chat with `chat.stream.writer({ target: "root" })` instead of failing with "session handle is not initialized".
92 changes: 87 additions & 5 deletions packages/trigger-sdk/src/v3/ai.ts
Comment thread
ericallam marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ const chatTurnContextKey = locals.create<ChatTurnContext>("chat.turnContext");
* @internal
*/
const chatSessionHandleKey = locals.create<SessionHandle>("chat.sessionHandle");
// The external `chatId` from the boot payload — the value `ToolCallExecutionOptions.chatId`
// is documented to carry. Custom-agent loops never set per-turn context, so subtask tool
// metadata reads this directly rather than the Session handle id.
const chatExternalIdKey = locals.create<string>("chat.externalId");

/**
* S2 seq_num of the most recent `turn-complete` control record written by
Expand Down Expand Up @@ -221,6 +225,47 @@ export async function __findLatestSessionInCursorForTests(
return findLatestSessionInCursor(chatId);
}

/**
* Seed the `.in` resume cursor for custom-agent loops (`chat.customAgent`
* raw loops and `chat.createSession`) the way `chat.agent`'s boot does.
*
* MUST run before anything attaches a `.in` listener (`createStopSignal`,
* `chat.messages.on`, the first wait): attaching opens the SSE tail with
* `Last-Event-ID` from the seeded cursor, so attach-then-seed replays
* every record from seq 0 — already-answered user messages get delivered
* into the new run's first wait and the loop re-answers them.
*
* Seeds both cursors: `setLastSeqNum` controls the SSE `Last-Event-ID`,
* `setLastDispatchedSeqNum` gates waiter dispatch — seeding only the
* former still re-delivers records the manager buffered before the seed.
*
* No-ops on fresh boots and when a cursor is already seeded (e.g. the
* `chatCustomAgent` wrapper ran before a nested `createChatSession`).
* @internal
*/
async function seedSessionInResumeCursorForCustomLoop(
payload: Pick<ChatTaskWirePayload, "chatId" | "continuation">
): Promise<void> {
if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return;
// No continuation/attempt gate: the wire may omit `continuation` on a
// run that still has prior turns (chat.agent covers that case via its
// snapshot). The scan doubles as the prior-state probe — a fresh
// session has no turn-complete on `.out`, returns no cursor, and
// seeds nothing. Cost on fresh boots is one non-blocking records read.
try {
const cursor = await findLatestSessionInCursor(payload.chatId);
if (cursor !== undefined) {
sessionStreams.setLastSeqNum(payload.chatId, "in", cursor);
sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor);
}
} catch (error) {
logger.warn(
"chat session: session.in resume cursor lookup failed; old messages may replay",
{ error: error instanceof Error ? error.message : String(error) }
);
}
}

/**
* Versioned blob written to S3 after every turn completes (when no
* `hydrateMessages` hook is registered). Read at run boot to seed the
Expand Down Expand Up @@ -921,6 +966,15 @@ function createTaskToolExecuteHandler<
toolMeta.turn = chatCtx.turn;
toolMeta.continuation = chatCtx.continuation;
toolMeta.clientData = chatCtx.clientData;
} else {
// Hand-rolled chat.customAgent loops never set per-turn context, but
// the wrapper records the boot payload's external chatId at run boot
// — thread it so subtask chat helpers (`chat.stream.writer` with
// target "root") can open the parent's session.
const chatExternalId = locals.get(chatExternalIdKey);
if (chatExternalId) {
toolMeta.chatId = chatExternalId;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

const chatLocals: Record<string, unknown> = {};
Expand Down Expand Up @@ -5104,6 +5158,7 @@ function chatCustomAgent<
// `chat.createStartSessionAction`) before this run is triggered.
// No client-side upsert needed.
locals.set(chatSessionHandleKey, sessions.open(payload.chatId));
locals.set(chatExternalIdKey, payload.chatId);
locals.set(chatAgentRunContextKey, runOptions.ctx);
// Initialize the turn-complete trim slot so `chat.writeTurnComplete`
// trims `session.out` back to the previous turn boundary. Without
Expand All @@ -5113,6 +5168,10 @@ function chatCustomAgent<
markChatAgentRunForStreamsWarning();
taskContext.setConversationId(payload.chatId);
stampConversationIdOnActiveSpan(payload.chatId);
// Seed the `.in` resume cursor before user code attaches any `.in`
// listener — otherwise a continuation boot replays already-answered
// messages into the loop's first wait.
await seedSessionInResumeCursorForCustomLoop(payload);
return userRun(payload, runOptions);
},
});
Expand Down Expand Up @@ -5213,6 +5272,7 @@ function chatAgent<
// `chat.createStartSessionAction` or browser-direct) before this
// run is triggered — no client-side upsert needed here.
locals.set(chatSessionHandleKey, sessions.open(payload.chatId));
locals.set(chatExternalIdKey, payload.chatId);
// Mutable holder; advances in `writeTurnCompleteChunk` after each turn
// and is the trim target for the NEXT turn's trim record.
locals.set(lastTurnCompleteSeqNumKey, { value: undefined });
Expand Down Expand Up @@ -8613,8 +8673,15 @@ async function pipeChatAndCapture(
resolveOnFinish = r;
});

const resolvedOptions = resolveUIMessageStreamOptions();
const uiStream = source.toUIMessageStream({
...resolveUIMessageStreamOptions(),
...resolvedOptions,
// Stamp a server-generated id on the start chunk, same as chat.agent's
// pipe. Without it the AI SDK regenerates the assistant id when a
// prepareStep injection (steering) starts a new step mid-stream, and
// the frontend replaces the partial message — wiping the
// pre-injection text from the UI and the captured response.
generateMessageId: resolvedOptions.generateMessageId ?? generateMessageId,
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
captured = responseMessage;
resolveOnFinish!();
Expand Down Expand Up @@ -8936,14 +9003,18 @@ export type ChatTurn = {
* signaling, and idle/suspend between turns. You control: initialization,
* model/tool selection, persistence, and any custom per-turn logic.
*
* Call from inside a `chat.customAgent()` run — the wrapper binds the
* backing Session that the iterator's stop signal and message channels
* resolve to. (A plain `task()` does not bind it, so `createSession`
* would throw "session handle is not initialized".)
*
* @example
* ```ts
* import { task } from "@trigger.dev/sdk";
* import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
* import { streamText } from "ai";
* import { openai } from "@ai-sdk/openai";
*
* export const myChat = task({
* export const myChat = chat.customAgent({
* id: "my-chat",
* run: async (payload: ChatTaskWirePayload, { signal }) => {
* const session = chat.createSession(payload, { signal });
Expand Down Expand Up @@ -8979,13 +9050,23 @@ function createChatSession(
[Symbol.asyncIterator]() {
let currentPayload = payload;
let turn = -1;
const stop = createStopSignal();
// Created on the first next() call, AFTER the resume-cursor seed —
// createStopSignal attaches the `.in` SSE tail, and attaching
// before the seed replays every record from seq 0 (the seed is a
// no-op when the chatCustomAgent wrapper already ran it).
let stop!: ReturnType<typeof createStopSignal>;
let booted = false;
const accumulator = new ChatMessageAccumulator();
let previousTurnUsage: LanguageModelUsage | undefined;
let cumulativeUsage: LanguageModelUsage = emptyUsage();

return {
async next(): Promise<IteratorResult<ChatTurn>> {
if (!booted) {
booted = true;
await seedSessionInResumeCursorForCustomLoop(currentPayload);
stop = createStopSignal();
}
turn++;

// First turn: wait when the boot payload carries no message.
Expand Down Expand Up @@ -9328,7 +9409,8 @@ function createChatSession(
},

async return() {
stop.cleanup();
// `stop` only exists once next() has booted the iterator.
stop?.cleanup();
return { done: true, value: undefined };
},
};
Expand Down
Loading