| title | Backend |
|---|---|
| sidebarTitle | Backend |
| description | Three approaches to building your chat backend — chat.agent(), session iterator, or raw task primitives. |
The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically.
To fix a **custom** `UIMessage` subtype or typed client data schema, use the [ChatBuilder](/ai-chat/types#chatbuilder) via `chat.withUIMessage<...>()` and/or `chat.withClientData({ schema })`. Builder-level hooks can also be chained before `.agent()`. See [Types](/ai-chat/types).Return the streamText result from run and it's automatically piped to the frontend:
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
export const simpleChat = chat.agent({
id: "simple-chat",
run: async ({ messages, signal }) => {
return streamText({
model: openai("gpt-4o"),
system: "You are a helpful assistant.",
messages,
abortSignal: signal,
});
},
});For complex agent flows where streamText is called deep inside your code, use chat.pipe(). It works from anywhere inside a task — even nested function calls.
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import type { ModelMessage } from "ai";
export const agentChat = chat.agent({
id: "agent-chat",
run: async ({ messages }) => {
// Don't return anything — chat.pipe is called inside
await runAgentLoop(messages);
},
});
async function runAgentLoop(messages: ModelMessage[]) {
// ... agent logic, tool calls, etc.
const result = streamText({
model: openai("gpt-4o"),
messages,
});
// Pipe from anywhere — no need to return it
await chat.pipe(result);
}Every chat lifecycle callback and the run payload include ctx: the same run context object as task({ run: (payload, { ctx }) => ... }). Import the type with import type { TaskRunContext } from "@trigger.dev/sdk" (the Context export is the same type). Use ctx for tags, metadata, or any API that needs the full run record. The string runId on chat events is always ctx.run.id (both are provided for convenience). See Task context (ctx) in the API reference.
Standard task lifecycle hooks — onWait, onResume, onComplete, onFailure, etc. — are also available on chat.agent() with the same shapes as on a normal task().
Chat agents also have two dedicated suspension hooks — onChatSuspend and onChatResume — that fire at the idle-to-suspended transition with full chat context. Use them for resource cleanup (e.g. tearing down sandboxes) and re-initialization. See onChatSuspend / onChatResume and the Code execution sandbox pattern.
Fires when a preloaded run starts — before any messages arrive. Use it to eagerly initialize state (DB records, user context) while the user is still typing.
Preloaded runs are triggered by calling transport.preload(chatId) on the frontend. See Preload for details.
export const myChat = chat.agent({
id: "my-chat",
clientDataSchema: z.object({ userId: z.string() }),
onPreload: async ({ ctx, chatId, clientData, runId, chatAccessToken }) => {
// Initialize early — before the first message arrives
const user = await db.user.findUnique({ where: { id: clientData.userId } });
userContext.init({ name: user.name, plan: user.plan });
await db.chat.create({ data: { id: chatId, userId: clientData.userId } });
await db.chatSession.upsert({
where: { id: chatId },
create: { id: chatId, runId, publicAccessToken: chatAccessToken },
update: { runId, publicAccessToken: chatAccessToken },
});
},
onChatStart: async ({ preloaded }) => {
if (preloaded) return; // Already initialized in onPreload
// ... non-preloaded initialization
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});| Field | Type | Description |
|---|---|---|
ctx |
TaskRunContext |
Full task run context — reference |
chatId |
string |
Chat session ID |
runId |
string |
The Trigger.dev run ID |
chatAccessToken |
string |
Scoped access token for this run |
clientData |
Typed by clientDataSchema |
Custom data from the frontend |
writer |
ChatWriter |
Stream writer for custom chunks |
Every lifecycle callback receives a writer — a lazy stream writer that lets you send custom UIMessageChunk parts (like data-* parts) to the frontend without the ceremony of chat.stream.writer(). See ChatWriter.
Fires once on the first turn (turn 0) before run() executes. Use it to create a chat record in your database.
The continuation field tells you whether this is a brand new chat or a continuation of an existing one (where the previous run timed out or was cancelled). The preloaded field tells you whether onPreload already ran.
export const myChat = chat.agent({
id: "my-chat",
onChatStart: async ({ chatId, clientData, continuation, preloaded }) => {
if (preloaded) return; // Already set up in onPreload
if (continuation) return; // Chat record already exists
const { userId } = clientData as { userId: string };
await db.chat.create({
data: { id: chatId, userId, title: "New chat" },
});
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});Validate or transform incoming UIMessage[] before they are converted to model messages. Fires once per turn with the raw messages from the wire payload (after cleanup of aborted tool parts), before accumulation and toModelMessages().
Return the validated messages array. Throw to abort the turn with an error.
This is the right place to call the AI SDK's validateUIMessages to catch malformed messages from storage or untrusted input before they reach the model — especially useful when persisting conversations to a database, where tool schemas may drift between deploys.
| Field | Type | Description |
|---|---|---|
messages |
UIMessage[] |
Incoming UI messages for this turn |
chatId |
string |
Chat session ID |
turn |
number |
Turn number (0-indexed) |
trigger |
"submit-message" | "regenerate-message" | "preload" | "close" |
The trigger type for this turn |
import { validateUIMessages } from "ai";
export const myChat = chat.agent({
id: "my-chat",
onValidateMessages: async ({ messages }) => {
return validateUIMessages({ messages, tools: chatTools });
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, tools: chatTools, abortSignal: signal });
},
});Fires at the start of every turn, after message accumulation and onChatStart (turn 0), but before run() executes. Use it to persist messages before streaming begins — so a mid-stream page refresh still shows the user's message.
| Field | Type | Description |
|---|---|---|
ctx |
TaskRunContext |
Full task run context — reference |
chatId |
string |
Chat session ID |
messages |
ModelMessage[] |
Full accumulated conversation (model format) |
uiMessages |
UIMessage[] |
Full accumulated conversation (UI format) |
turn |
number |
Turn number (0-indexed) |
runId |
string |
The Trigger.dev run ID |
chatAccessToken |
string |
Scoped access token for this run |
continuation |
boolean |
Whether this run is continuing an existing chat |
preloaded |
boolean |
Whether this run was preloaded |
clientData |
Typed by clientDataSchema |
Custom data from the frontend |
writer |
ChatWriter |
Stream writer for custom chunks |
export const myChat = chat.agent({
id: "my-chat",
onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
await db.chat.update({
where: { id: chatId },
data: { messages: uiMessages },
});
await db.chatSession.upsert({
where: { id: chatId },
create: { id: chatId, runId, publicAccessToken: chatAccessToken },
update: { runId, publicAccessToken: chatAccessToken },
});
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});Fires after the response is captured but before the stream closes. The writer can send custom chunks that appear in the current turn — use this for post-processing indicators, compaction progress, or any data the user should see before the turn ends.
export const myChat = chat.agent({
id: "my-chat",
onBeforeTurnComplete: async ({ writer, usage, uiMessages }) => {
// Write a custom data part while the stream is still open
writer.write({
type: "data-usage-summary",
data: {
tokens: usage?.totalTokens,
messageCount: uiMessages.length,
},
});
// You can also compact messages here and write progress
if (usage?.totalTokens && usage.totalTokens > 50_000) {
writer.write({ type: "data-compaction", data: { status: "compacting" } });
chat.setMessages(compactedMessages);
writer.write({ type: "data-compaction", data: { status: "complete" } });
}
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});Receives the same fields as TurnCompleteEvent, plus a writer.
Fires after each turn completes — after the response is captured and the stream is closed. This is the primary hook for persisting the assistant's response. Does not include a writer since the stream is already closed.
| Field | Type | Description |
|---|---|---|
ctx |
TaskRunContext |
Full task run context — reference |
chatId |
string |
Chat session ID |
messages |
ModelMessage[] |
Full accumulated conversation (model format) |
uiMessages |
UIMessage[] |
Full accumulated conversation (UI format) |
newMessages |
ModelMessage[] |
Only this turn's messages (model format) |
newUIMessages |
UIMessage[] |
Only this turn's messages (UI format) |
responseMessage |
UIMessage | undefined |
The assistant's response for this turn |
turn |
number |
Turn number (0-indexed) |
runId |
string |
The Trigger.dev run ID |
chatAccessToken |
string |
Scoped access token for this run |
lastEventId |
string | undefined |
Stream position for resumption. Persist this with the session. |
stopped |
boolean |
Whether the user stopped generation during this turn |
continuation |
boolean |
Whether this run is continuing an existing chat |
rawResponseMessage |
UIMessage | undefined |
The raw assistant response before abort cleanup (same as responseMessage when not stopped) |
export const myChat = chat.agent({
id: "my-chat",
onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
await db.chat.update({
where: { id: chatId },
data: { messages: uiMessages },
});
await db.chatSession.upsert({
where: { id: chatId },
create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId },
update: { runId, publicAccessToken: chatAccessToken, lastEventId },
});
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});Chat-specific hooks that fire at the idle-to-suspended transition — the moment the run stops using compute and waits for the next message. These replace the need for the generic onWait / onResume task hooks for chat-specific work.
The phase discriminator tells you when the suspend/resume happened:
"preload"— afteronPreload, waiting for the first message"turn"— afteronTurnComplete, waiting for the next message
export const myChat = chat.agent({
id: "my-chat",
onChatSuspend: async (event) => {
// Tear down expensive resources before suspending
await disposeCodeSandbox(event.ctx.run.id);
if (event.phase === "turn") {
logger.info("Suspending after turn", { turn: event.turn });
}
},
onChatResume: async (event) => {
// Re-initialize after waking up
logger.info("Resumed", { phase: event.phase });
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});| Field | Type | Description |
|---|---|---|
phase |
"preload" | "turn" |
Whether this is a preload or post-turn suspension |
ctx |
TaskRunContext |
Full task run context |
chatId |
string |
Chat session ID |
runId |
string |
The Trigger.dev run ID |
clientData |
Typed by clientDataSchema |
Custom data from the frontend |
turn |
number |
Turn number ("turn" phase only) |
messages |
ModelMessage[] |
Accumulated model messages ("turn" phase only) |
uiMessages |
UIMessage[] |
Accumulated UI messages ("turn" phase only) |
When set to true, a preloaded run completes successfully after the idle timeout elapses instead of suspending. Use this for "fire and forget" preloads — if the user doesn't send a message during the idle window, the run ends cleanly.
export const myChat = chat.agent({
id: "my-chat",
preloadIdleTimeoutInSeconds: 10,
exitAfterPreloadIdle: true,
onPreload: async ({ chatId, clientData }) => {
// Eagerly set up state — if no message comes, the run just ends
await initializeChat(chatId, clientData);
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});Use AI Prompts to manage your system prompt as versioned, overridable config. Store the resolved prompt in a lifecycle hook with chat.prompt.set(), then spread chat.toStreamTextOptions() into streamText — it includes the system prompt, model, config, and telemetry automatically.
import { chat } from "@trigger.dev/sdk/ai";
import { prompts } from "@trigger.dev/sdk";
import { streamText, createProviderRegistry } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const registry = createProviderRegistry({ openai });
const systemPrompt = prompts.define({
id: "my-chat-system",
model: "openai:gpt-4o",
config: { temperature: 0.7 },
variables: z.object({ name: z.string() }),
content: `You are a helpful assistant for {{name}}.`,
});
export const myChat = chat.agent({
id: "my-chat",
clientDataSchema: z.object({ userId: z.string() }),
onChatStart: async ({ clientData }) => {
const user = await db.user.findUnique({ where: { id: clientData.userId } });
const resolved = await systemPrompt.resolve({ name: user.name });
chat.prompt.set(resolved);
},
run: async ({ messages, signal }) => {
return streamText({
...chat.toStreamTextOptions({ registry }), // system, model, config, telemetry
messages,
abortSignal: signal,
});
},
});chat.toStreamTextOptions() returns an object with system, model (resolved via the registry), temperature, and experimental_telemetry — all from the stored prompt. Properties you set after the spread (like a client-selected model) take precedence.
Calling stop() from useChat sends a stop signal to the running task via input streams. The task's streamText call aborts (if you passed signal or stopSignal), but the run stays alive and waits for the next message. The partial response is captured and accumulated normally.
The run function receives three abort signals:
| Signal | Fires when | Use for |
|---|---|---|
signal |
Stop or cancel | Pass to streamText — handles both cases. Use this in most cases. |
stopSignal |
Stop only (per-turn, reset each turn) | Custom logic that should only run on user stop, not cancellation |
cancelSignal |
Run cancel, expire, or maxDuration exceeded | Cleanup that should only happen on full cancellation |
export const myChat = chat.agent({
id: "my-chat",
run: async ({ messages, signal, stopSignal, cancelSignal }) => {
return streamText({
model: openai("gpt-4o"),
messages,
abortSignal: signal, // Handles both stop and cancel
});
},
});The onTurnComplete event includes a stopped boolean that indicates whether the user stopped generation during that turn:
export const myChat = chat.agent({
id: "my-chat",
onTurnComplete: async ({ chatId, uiMessages, stopped }) => {
await db.chat.update({
where: { id: chatId },
data: { messages: uiMessages, lastStoppedAt: stopped ? new Date() : undefined },
});
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});You can also check stop status from anywhere during a turn using chat.isStopped(). This is useful inside streamText's onFinish callback where the AI SDK's isAborted flag can be unreliable (e.g. when using createUIMessageStream + writer.merge()):
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
export const myChat = chat.agent({
id: "my-chat",
run: async ({ messages, signal }) => {
return streamText({
model: openai("gpt-4o"),
messages,
abortSignal: signal,
onFinish: ({ isAborted }) => {
// isAborted may be false even after stop when using createUIMessageStream
const wasStopped = isAborted || chat.isStopped();
if (wasStopped) {
// handle stop — e.g. log analytics
}
},
});
},
});When stop happens mid-stream, the captured response message can contain parts in an incomplete state — tool calls stuck in partial-call, reasoning blocks still marked as streaming, etc. These can cause UI issues like permanent spinners.
chat.agent automatically cleans up the responseMessage when stop is detected before passing it to onTurnComplete. If you use chat.pipe() manually and capture response messages yourself, use chat.cleanupAbortedParts():
const cleaned = chat.cleanupAbortedParts(rawResponseMessage);This removes tool invocation parts stuck in partial-call state and marks any streaming text or reasoning parts as done.
To build a chat app that survives page refreshes, you need to persist two things:
- Messages — The conversation history. Persisted server-side in the task via
onTurnStartandonTurnComplete. - Sessions — The transport's connection state (
runId,publicAccessToken,lastEventId). Persisted server-side viaonTurnStartandonTurnComplete.
export const myChat = chat.agent({ id: "my-chat", clientDataSchema: z.object({ userId: z.string(), }), onChatStart: async ({ chatId, clientData }) => { await db.chat.create({ data: { id: chatId, userId: clientData.userId, title: "New chat", messages: [] }, }); }, onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => { // Persist messages + session before streaming await db.chat.update({ where: { id: chatId }, data: { messages: uiMessages }, }); await db.chatSession.upsert({ where: { id: chatId }, create: { id: chatId, runId, publicAccessToken: chatAccessToken }, update: { runId, publicAccessToken: chatAccessToken }, }); }, onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => { // Persist assistant response + stream position await db.chat.update({ where: { id: chatId }, data: { messages: uiMessages }, }); await db.chatSession.upsert({ where: { id: chatId }, create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId }, update: { runId, publicAccessToken: chatAccessToken, lastEventId }, }); }, run: async ({ messages, signal }) => { return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal, }); }, });
```ts app/actions.ts
"use server";
import { chat } from "@trigger.dev/sdk/ai";
import type { myChat } from "@/trigger/chat";
import { db } from "@/lib/db";
export const getChatToken = () => chat.createAccessToken<typeof myChat>("my-chat");
export async function getChatMessages(chatId: string) {
const found = await db.chat.findUnique({ where: { id: chatId } });
return found?.messages ?? [];
}
export async function getAllSessions() {
const sessions = await db.chatSession.findMany();
const result: Record<
string,
{
runId: string;
publicAccessToken: string;
lastEventId?: string;
}
> = {};
for (const s of sessions) {
result[s.id] = {
runId: s.runId,
publicAccessToken: s.publicAccessToken,
lastEventId: s.lastEventId ?? undefined,
};
}
return result;
}
export async function deleteSession(chatId: string) {
await db.chatSession.delete({ where: { id: chatId } }).catch(() => {});
}
"use client";
import { useChat } from "@ai-sdk/react";
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
import type { myChat } from "@/trigger/chat";
import { getChatToken, deleteSession } from "@/app/actions";
export function Chat({ chatId, initialMessages, initialSessions }) {
const transport = useTriggerChatTransport<typeof myChat>({
task: "my-chat",
accessToken: getChatToken,
clientData: { userId: currentUser.id }, // Type-checked against clientDataSchema
sessions: initialSessions,
onSessionChange: (id, session) => {
if (!session) deleteSession(id);
},
});
const { messages, sendMessage, stop, status } = useChat({
id: chatId,
messages: initialMessages,
transport,
resume: initialMessages.length > 0,
});
return (
<div>
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong>
{m.parts.map((part, i) =>
part.type === "text" ? <span key={i}>{part.text}</span> : null
)}
</div>
))}
<form
onSubmit={(e) => {
e.preventDefault();
const input = e.currentTarget.querySelector("input");
if (input?.value) {
sendMessage({ text: input.value });
input.value = "";
}
}}
>
<input placeholder="Type a message..." />
<button type="submit" disabled={status === "streaming"}>
Send
</button>
{status === "streaming" && (
<button type="button" onClick={stop}>
Stop
</button>
)}
</form>
</div>
);
}Users can send messages while the agent is executing tool calls. With pendingMessages, these messages are injected between tool-call steps, steering the agent mid-execution:
export const myChat = chat.agent({
id: "my-chat",
pendingMessages: {
shouldInject: ({ steps }) => steps.length > 0,
},
run: async ({ messages, signal }) => {
return streamText({
...chat.toStreamTextOptions({ registry }),
messages,
tools: {
/* ... */
},
abortSignal: signal,
});
},
});On the frontend, the usePendingMessages hook handles sending, tracking, and rendering injection points.
Inject context from background work into the conversation using chat.inject(). Combine with chat.defer() to run analysis between turns and inject results before the next response — self-review, RAG augmentation, safety checks, etc.
export const myChat = chat.agent({
id: "my-chat",
onTurnComplete: async ({ messages }) => {
chat.defer(
(async () => {
const review = await generateObject({
/* ... */
});
if (review.object.needsImprovement) {
chat.inject([
{
role: "system",
content: `[Self-review]\n${review.object.suggestions.join("\n")}`,
},
]);
}
})()
);
},
run: async ({ messages, signal }) => {
return streamText({ ...chat.toStreamTextOptions({ registry }), messages, abortSignal: signal });
},
});Transform model messages before they're used anywhere — in run(), in compaction rebuilds, and in compaction results. Define once, applied everywhere.
Use this for Anthropic cache breaks, injecting system context, stripping PII, etc.
export const myChat = chat.agent({
id: "my-chat",
prepareMessages: ({ messages, reason }) => {
// Add Anthropic cache breaks to the last message
if (messages.length === 0) return messages;
const last = messages[messages.length - 1];
return [
...messages.slice(0, -1),
{
...last,
providerOptions: {
...last.providerOptions,
anthropic: { cacheControl: { type: "ephemeral" } },
},
},
];
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});The reason field tells you why messages are being prepared:
| Reason | Description |
|---|---|
"run" |
Messages being passed to run() for streamText |
"compaction-rebuild" |
Rebuilding from a previous compaction summary |
"compaction-result" |
Fresh compaction just produced these messages |
Chat agent runs are pinned to the worker version they started on. When you deploy a new version, suspended runs resume on the old code. Call chat.requestUpgrade() in onTurnStart to skip run() and exit immediately — the transport re-triggers the same message on the latest version. See the Version Upgrades pattern for the full guide.
Override how long the run stays suspended waiting for the next message. Call from inside run():
run: async ({ messages, signal }) => {
chat.setTurnTimeout("2h"); // Wait longer for this conversation
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},Override how long the run stays idle (active, using compute) after each turn:
run: async ({ messages, signal }) => {
chat.setIdleTimeoutInSeconds(60); // Stay idle for 1 minute
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},Control how streamText results are converted to the frontend stream via toUIMessageStream(). Set static defaults on the task, or override per-turn.
When streamText encounters an error mid-stream (rate limits, API failures, network errors), the onError callback converts it to a string that's sent to the frontend as an { type: "error", errorText } chunk. The AI SDK's useChat receives this via its onError callback.
By default, the raw error message is sent to the frontend. Use onError to sanitize errors and avoid leaking internal details:
export const myChat = chat.agent({
id: "my-chat",
uiMessageStreamOptions: {
onError: (error) => {
// Log the full error server-side for debugging
console.error("Stream error:", error);
// Return a sanitized message — this is what the frontend sees
if (error instanceof Error && error.message.includes("rate limit")) {
return "Rate limited — please wait a moment and try again.";
}
return "Something went wrong. Please try again.";
},
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});onError is also called for tool execution errors, so a single handler covers both LLM errors and tool failures.
On the frontend, handle the error in useChat:
const { messages, sendMessage } = useChat({
transport,
onError: (error) => {
// error.message contains the string returned by your onError handler
toast.error(error.message);
},
});Control which AI SDK features are forwarded to the frontend:
export const myChat = chat.agent({
id: "my-chat",
uiMessageStreamOptions: {
sendReasoning: true, // Forward model reasoning (default: true)
sendSources: true, // Forward source citations (default: false)
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});Override per-turn with chat.setUIMessageStreamOptions() — per-turn values merge with the static config (per-turn wins on conflicts). The override is cleared automatically after each turn.
run: async ({ messages, clientData, signal }) => {
// Enable reasoning only for certain models
if (clientData.model?.includes("claude")) {
chat.setUIMessageStreamOptions({ sendReasoning: true });
}
return streamText({ model: openai(clientData.model ?? "gpt-4o"), messages, abortSignal: signal });
},chat.setUIMessageStreamOptions() works across all abstraction levels — chat.agent(), chat.createSession() / turn.complete(), and chat.pipeAndCapture().
See ChatUIMessageStreamOptions for the full reference.
`onFinish` is managed internally for response capture and cannot be overridden here. Use `streamText`'s `onFinish` callback for custom finish handling, or use [raw task mode](#raw-task-with-primitives) for full control over `toUIMessageStream()`.If you need full control over task options, use the standard task() with ChatTaskPayload and chat.pipe():
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
export const manualChat = task({
id: "manual-chat",
retry: { maxAttempts: 3 },
queue: { concurrencyLimit: 10 },
run: async (payload: ChatTaskPayload) => {
const result = streamText({
model: openai("gpt-4o"),
messages: payload.messages,
});
await chat.pipe(result);
},
});A middle ground between chat.agent() and raw primitives. You get an async iterator that yields ChatTurn objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic.
Use chat.createSession() inside a standard task():
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
export const myChat = task({
id: "my-chat",
run: async (payload: ChatTaskWirePayload, { signal }) => {
// One-time initialization — just code, no hooks
const clientData = payload.metadata as { userId: string };
await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } });
const session = chat.createSession(payload, {
signal,
idleTimeoutInSeconds: 60,
timeout: "1h",
});
for await (const turn of session) {
const result = streamText({
model: openai("gpt-4o"),
messages: turn.messages,
abortSignal: turn.signal,
});
// Pipe, capture, accumulate, and signal turn-complete — all in one call
await turn.complete(result);
// Persist after each turn
await db.chat.update({
where: { id: turn.chatId },
data: { messages: turn.uiMessages },
});
}
},
});| Option | Type | Default | Description |
|---|---|---|---|
signal |
AbortSignal |
required | Run-level cancel signal (from task context) |
idleTimeoutInSeconds |
number |
30 |
Seconds to stay idle between turns |
timeout |
string |
"1h" |
Duration string for suspend timeout |
maxTurns |
number |
100 |
Max turns before ending |
Each turn yielded by the iterator provides:
| Field | Type | Description |
|---|---|---|
number |
number |
Turn number (0-indexed) |
chatId |
string |
Chat session ID |
trigger |
string |
What triggered this turn |
clientData |
unknown |
Client data from the transport |
messages |
ModelMessage[] |
Full accumulated model messages — pass to streamText |
uiMessages |
UIMessage[] |
Full accumulated UI messages — use for persistence |
signal |
AbortSignal |
Combined stop+cancel signal (fresh each turn) |
stopped |
boolean |
Whether the user stopped generation this turn |
continuation |
boolean |
Whether this is a continuation run |
| Method | Description |
|---|---|
turn.complete(source) |
Pipe stream, capture response, accumulate, and signal turn-complete |
turn.done() |
Just signal turn-complete (when you've piped manually) |
turn.addResponse(response) |
Add a response to the accumulator manually |
turn.complete(result) is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk.
For more control, you can do each step manually:
for await (const turn of session) {
const result = streamText({
model: openai("gpt-4o"),
messages: turn.messages,
abortSignal: turn.signal,
});
// Manual: pipe and capture separately
const response = await chat.pipeAndCapture(result, { signal: turn.signal });
if (response) {
// Custom processing before accumulating
await turn.addResponse(response);
}
// Custom persistence, analytics, etc.
await db.chat.update({ ... });
// Must call done() when not using complete()
await turn.done();
}For full control, use a standard task() with the composable primitives from the chat namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling.
Raw task mode also lets you call .toUIMessageStream() yourself with any options — including onFinish and originalMessages. This is the right choice when you need complete control over the stream conversion beyond what chat.setUIMessageStreamOptions() provides.
| Primitive | Description |
|---|---|
chat.messages |
Input stream for incoming messages — use .waitWithIdleTimeout() to wait for the next turn |
chat.createStopSignal() |
Create a managed stop signal wired to the stop input stream |
chat.pipeAndCapture(result) |
Pipe a StreamTextResult to the chat stream and capture the response |
chat.writeTurnComplete() |
Signal the frontend that the current turn is complete |
chat.MessageAccumulator |
Accumulates conversation messages across turns |
chat.pipe(stream) |
Pipe a stream to the frontend (no response capture) |
chat.cleanupAbortedParts(msg) |
Clean up incomplete parts from a stopped response |
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
export const myChat = task({
id: "my-chat-raw",
run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => {
let currentPayload = payload;
// Handle preload — wait for the first real message
if (currentPayload.trigger === "preload") {
const result = await chat.messages.waitWithIdleTimeout({
idleTimeoutInSeconds: 60,
timeout: "1h",
spanName: "waiting for first message",
});
if (!result.ok) return;
currentPayload = result.output;
}
const stop = chat.createStopSignal();
const conversation = new chat.MessageAccumulator();
for (let turn = 0; turn < 100; turn++) {
stop.reset();
const messages = await conversation.addIncoming(
currentPayload.messages,
currentPayload.trigger,
turn
);
const combinedSignal = AbortSignal.any([runSignal, stop.signal]);
const result = streamText({
model: openai("gpt-4o"),
messages,
abortSignal: combinedSignal,
});
let response;
try {
response = await chat.pipeAndCapture(result, { signal: combinedSignal });
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
if (runSignal.aborted) break;
// Stop — fall through to accumulate partial
} else {
throw error;
}
}
if (response) {
const cleaned =
stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response;
await conversation.addResponse(cleaned);
}
if (runSignal.aborted) break;
// Persist, analytics, etc.
await db.chat.update({
where: { id: currentPayload.chatId },
data: { messages: conversation.uiMessages },
});
await chat.writeTurnComplete();
// Wait for the next message
const next = await chat.messages.waitWithIdleTimeout({
idleTimeoutInSeconds: 60,
timeout: "1h",
spanName: "waiting for next message",
});
if (!next.ok) break;
currentPayload = next.output;
}
stop.cleanup();
},
});The MessageAccumulator handles the transport protocol automatically:
- Turn 0: replaces messages (full history from frontend)
- Subsequent turns: appends new messages (frontend only sends the new user message)
- Regenerate: replaces messages (full history minus last assistant message)
const conversation = new chat.MessageAccumulator();
// Returns full accumulated ModelMessage[] for streamText
const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn);
// After piping, add the response
const response = await chat.pipeAndCapture(result);
if (response) await conversation.addResponse(response);
// Access accumulated messages for persistence
conversation.uiMessages; // UIMessage[]
conversation.modelMessages; // ModelMessage[]