Skip to content

Commit ce2b2d8

Browse files
committed
fix(sdk): custom agent loop parity with chat.agent for continuations, steering, and subtasks
Three fixes for chat.customAgent raw loops and chat.createSession: Continuation boots replayed already-answered user messages into the first wait: the .in SSE tail attached (via createStopSignal or any listener) before a resume cursor existed, so S2 replayed from seq 0. The custom-agent wrapper and createChatSession's first next() now seed both manager cursors from the latest turn-complete header before anything attaches, the same boot logic chat.agent uses. Seeding only setLastSeqNum after attach (the reverted earlier attempt) does not work because dispatch is gated on the other cursor. Steering a hand-rolled loop mid-stream wiped the in-flight assistant text: pipeChatAndCapture called toUIMessageStream without generateMessageId, so a prepareStep injection starting a new step regenerated the assistant id and the frontend replaced the partial message. It now stamps the server-generated id like chat.agent's pipe. Task-backed tools (ai.toolExecute) failed from custom agent loops with "session handle is not initialized" on the child run: the chatId only threaded from the per-turn context that raw loops never set. It now falls back to the session handle the customAgent wrapper binds at boot, so child tasks can stream into the parent's chat with chat.stream.writer({ target: "root" }).
1 parent 43b4936 commit ce2b2d8

2 files changed

Lines changed: 81 additions & 3 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Three fixes for custom agent loops (`chat.customAgent`, `chat.createSession`, and hand-rolled `MessageAccumulator` loops):
6+
7+
- Continuation runs no longer replay already-answered user messages into the first turn. The `.in` resume cursor is now seeded before any listener attaches (the same boot logic `chat.agent` uses), so a chat that continues after a cancel, crash, or upgrade only sees genuinely new messages.
8+
- Steering a hand-rolled loop mid-stream no longer wipes the in-flight assistant response. `chat.pipeAndCapture` now stamps a server-generated message id on the stream, so a `prepareStep` injection keeps the partial text instead of replacing the message.
9+
- Task-backed tools (`ai.toolExecute`) now work from custom agent loops: the parent's session is threaded to the child run, so child tasks can stream progress into the chat with `chat.stream.writer({ target: "root" })` instead of failing with "session handle is not initialized".

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,44 @@ export async function __findLatestSessionInCursorForTests(
221221
return findLatestSessionInCursor(chatId);
222222
}
223223

224+
/**
225+
* Seed the `.in` resume cursor for custom-agent loops (`chat.customAgent`
226+
* raw loops and `chat.createSession`) the way `chat.agent`'s boot does.
227+
*
228+
* MUST run before anything attaches a `.in` listener (`createStopSignal`,
229+
* `chat.messages.on`, the first wait): attaching opens the SSE tail with
230+
* `Last-Event-ID` from the seeded cursor, so attach-then-seed replays
231+
* every record from seq 0 — already-answered user messages get delivered
232+
* into the new run's first wait and the loop re-answers them.
233+
*
234+
* Seeds both cursors: `setLastSeqNum` controls the SSE `Last-Event-ID`,
235+
* `setLastDispatchedSeqNum` gates waiter dispatch — seeding only the
236+
* former still re-delivers records the manager buffered before the seed.
237+
*
238+
* No-ops on fresh boots and when a cursor is already seeded (e.g. the
239+
* `chatCustomAgent` wrapper ran before a nested `createChatSession`).
240+
* @internal
241+
*/
242+
async function seedSessionInResumeCursorForCustomLoop(
243+
payload: Pick<ChatTaskWirePayload, "chatId" | "continuation">
244+
): Promise<void> {
245+
const attemptNumber = taskContext.ctx?.attempt.number ?? 1;
246+
if (payload.continuation !== true && attemptNumber <= 1) return;
247+
if (sessionStreams.lastSeqNum(payload.chatId, "in") !== undefined) return;
248+
try {
249+
const cursor = await findLatestSessionInCursor(payload.chatId);
250+
if (cursor !== undefined) {
251+
sessionStreams.setLastSeqNum(payload.chatId, "in", cursor);
252+
sessionStreams.setLastDispatchedSeqNum(payload.chatId, "in", cursor);
253+
}
254+
} catch (error) {
255+
logger.warn(
256+
"chat session: session.in resume cursor lookup failed; old messages may replay",
257+
{ error: error instanceof Error ? error.message : String(error) }
258+
);
259+
}
260+
}
261+
224262
/**
225263
* Versioned blob written to S3 after every turn completes (when no
226264
* `hydrateMessages` hook is registered). Read at run boot to seed the
@@ -921,6 +959,15 @@ function createTaskToolExecuteHandler<
921959
toolMeta.turn = chatCtx.turn;
922960
toolMeta.continuation = chatCtx.continuation;
923961
toolMeta.clientData = chatCtx.clientData;
962+
} else {
963+
// Hand-rolled chat.customAgent loops never set per-turn context, but
964+
// the wrapper binds the session handle at run boot — thread the
965+
// chatId from it so subtask chat helpers (`chat.stream.writer`
966+
// with target "root") can open the parent's session.
967+
const sessionHandle = locals.get(chatSessionHandleKey);
968+
if (sessionHandle) {
969+
toolMeta.chatId = sessionHandle.id;
970+
}
924971
}
925972

926973
const chatLocals: Record<string, unknown> = {};
@@ -5113,6 +5160,10 @@ function chatCustomAgent<
51135160
markChatAgentRunForStreamsWarning();
51145161
taskContext.setConversationId(payload.chatId);
51155162
stampConversationIdOnActiveSpan(payload.chatId);
5163+
// Seed the `.in` resume cursor before user code attaches any `.in`
5164+
// listener — otherwise a continuation boot replays already-answered
5165+
// messages into the loop's first wait.
5166+
await seedSessionInResumeCursorForCustomLoop(payload);
51165167
return userRun(payload, runOptions);
51175168
},
51185169
});
@@ -8613,8 +8664,15 @@ async function pipeChatAndCapture(
86138664
resolveOnFinish = r;
86148665
});
86158666

8667+
const resolvedOptions = resolveUIMessageStreamOptions();
86168668
const uiStream = source.toUIMessageStream({
8617-
...resolveUIMessageStreamOptions(),
8669+
...resolvedOptions,
8670+
// Stamp a server-generated id on the start chunk, same as chat.agent's
8671+
// pipe. Without it the AI SDK regenerates the assistant id when a
8672+
// prepareStep injection (steering) starts a new step mid-stream, and
8673+
// the frontend replaces the partial message — wiping the
8674+
// pre-injection text from the UI and the captured response.
8675+
generateMessageId: resolvedOptions.generateMessageId ?? generateMessageId,
86188676
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
86198677
captured = responseMessage;
86208678
resolveOnFinish!();
@@ -8979,13 +9037,23 @@ function createChatSession(
89799037
[Symbol.asyncIterator]() {
89809038
let currentPayload = payload;
89819039
let turn = -1;
8982-
const stop = createStopSignal();
9040+
// Created on the first next() call, AFTER the resume-cursor seed —
9041+
// createStopSignal attaches the `.in` SSE tail, and attaching
9042+
// before the seed replays every record from seq 0 (the seed is a
9043+
// no-op when the chatCustomAgent wrapper already ran it).
9044+
let stop!: ReturnType<typeof createStopSignal>;
9045+
let booted = false;
89839046
const accumulator = new ChatMessageAccumulator();
89849047
let previousTurnUsage: LanguageModelUsage | undefined;
89859048
let cumulativeUsage: LanguageModelUsage = emptyUsage();
89869049

89879050
return {
89889051
async next(): Promise<IteratorResult<ChatTurn>> {
9052+
if (!booted) {
9053+
booted = true;
9054+
await seedSessionInResumeCursorForCustomLoop(currentPayload);
9055+
stop = createStopSignal();
9056+
}
89899057
turn++;
89909058

89919059
// First turn: wait when the boot payload carries no message.
@@ -9328,7 +9396,8 @@ function createChatSession(
93289396
},
93299397

93309398
async return() {
9331-
stop.cleanup();
9399+
// `stop` only exists once next() has booted the iterator.
9400+
stop?.cleanup();
93329401
return { done: true, value: undefined };
93339402
},
93349403
};

0 commit comments

Comments
 (0)