Skip to content

Latest commit

 

History

History
1179 lines (961 loc) · 47.1 KB

File metadata and controls

1179 lines (961 loc) · 47.1 KB
title Backend
sidebarTitle Backend
description Three approaches to building your chat backend — chat.agent(), session iterator, or raw task primitives.

chat.agent()

The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically.

To fix a **custom** `UIMessage` subtype or typed client data schema, use the [ChatBuilder](/ai-chat/types#chatbuilder) via `chat.withUIMessage<...>()` and/or `chat.withClientData({ schema })`. Builder-level hooks can also be chained before `.agent()`. See [Types](/ai-chat/types).

Simple: return a StreamTextResult

Return the streamText result from run and it's automatically piped to the frontend:

import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const simpleChat = chat.agent({
  id: "simple-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: openai("gpt-4o"),
      system: "You are a helpful assistant.",
      messages,
      abortSignal: signal,
    });
  },
});

Using chat.pipe() for complex flows

For complex agent flows where streamText is called deep inside your code, use chat.pipe(). It works from anywhere inside a task — even nested function calls.

import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import type { ModelMessage } from "ai";

export const agentChat = chat.agent({
  id: "agent-chat",
  run: async ({ messages }) => {
    // Don't return anything — chat.pipe is called inside
    await runAgentLoop(messages);
  },
});

async function runAgentLoop(messages: ModelMessage[]) {
  // ... agent logic, tool calls, etc.

  const result = streamText({
    model: openai("gpt-4o"),
    messages,
  });

  // Pipe from anywhere — no need to return it
  await chat.pipe(result);
}

Lifecycle hooks

Task context (ctx)

Every chat lifecycle callback and the run payload include ctx: the same run context object as task({ run: (payload, { ctx }) => ... }). Import the type with import type { TaskRunContext } from "@trigger.dev/sdk" (the Context export is the same type). Use ctx for tags, metadata, or any API that needs the full run record. The string runId on chat events is always ctx.run.id (both are provided for convenience). See Task context (ctx) in the API reference.

Standard task lifecycle hooksonWait, onResume, onComplete, onFailure, etc. — are also available on chat.agent() with the same shapes as on a normal task().

Chat agents also have two dedicated suspension hooks — onChatSuspend and onChatResume — that fire at the idle-to-suspended transition with full chat context. Use them for resource cleanup (e.g. tearing down sandboxes) and re-initialization. See onChatSuspend / onChatResume and the Code execution sandbox pattern.

onPreload

Fires when a preloaded run starts — before any messages arrive. Use it to eagerly initialize state (DB records, user context) while the user is still typing.

Preloaded runs are triggered by calling transport.preload(chatId) on the frontend. See Preload for details.

export const myChat = chat.agent({
  id: "my-chat",
  clientDataSchema: z.object({ userId: z.string() }),
  onPreload: async ({ ctx, chatId, clientData, runId, chatAccessToken }) => {
    // Initialize early — before the first message arrives
    const user = await db.user.findUnique({ where: { id: clientData.userId } });
    userContext.init({ name: user.name, plan: user.plan });

    await db.chat.create({ data: { id: chatId, userId: clientData.userId } });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken },
      update: { runId, publicAccessToken: chatAccessToken },
    });
  },
  onChatStart: async ({ preloaded }) => {
    if (preloaded) return; // Already initialized in onPreload
    // ... non-preloaded initialization
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
Field Type Description
ctx TaskRunContext Full task run context — reference
chatId string Chat session ID
runId string The Trigger.dev run ID
chatAccessToken string Scoped access token for this run
clientData Typed by clientDataSchema Custom data from the frontend
writer ChatWriter Stream writer for custom chunks

Every lifecycle callback receives a writer — a lazy stream writer that lets you send custom UIMessageChunk parts (like data-* parts) to the frontend without the ceremony of chat.stream.writer(). See ChatWriter.

onChatStart

Fires once on the first turn (turn 0) before run() executes. Use it to create a chat record in your database.

The continuation field tells you whether this is a brand new chat or a continuation of an existing one (where the previous run timed out or was cancelled). The preloaded field tells you whether onPreload already ran.

export const myChat = chat.agent({
  id: "my-chat",
  onChatStart: async ({ chatId, clientData, continuation, preloaded }) => {
    if (preloaded) return; // Already set up in onPreload
    if (continuation) return; // Chat record already exists

    const { userId } = clientData as { userId: string };
    await db.chat.create({
      data: { id: chatId, userId, title: "New chat" },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
`clientData` contains custom data from the frontend — either the `clientData` option on the transport constructor (sent with every message) or the `metadata` option on `sendMessage()` (per-message). See [Client data and metadata](/ai-chat/frontend#client-data-and-metadata).

onValidateMessages

Validate or transform incoming UIMessage[] before they are converted to model messages. Fires once per turn with the raw messages from the wire payload (after cleanup of aborted tool parts), before accumulation and toModelMessages().

Return the validated messages array. Throw to abort the turn with an error.

This is the right place to call the AI SDK's validateUIMessages to catch malformed messages from storage or untrusted input before they reach the model — especially useful when persisting conversations to a database, where tool schemas may drift between deploys.

Field Type Description
messages UIMessage[] Incoming UI messages for this turn
chatId string Chat session ID
turn number Turn number (0-indexed)
trigger "submit-message" | "regenerate-message" | "preload" | "close" The trigger type for this turn
import { validateUIMessages } from "ai";

export const myChat = chat.agent({
  id: "my-chat",
  onValidateMessages: async ({ messages }) => {
    return validateUIMessages({ messages, tools: chatTools });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, tools: chatTools, abortSignal: signal });
  },
});
`onValidateMessages` fires **before** `onTurnStart` and message accumulation. If you need to validate messages loaded from a database, do the loading in `onChatStart` or `onPreload` and let `onValidateMessages` validate the full incoming set each turn.

onTurnStart

Fires at the start of every turn, after message accumulation and onChatStart (turn 0), but before run() executes. Use it to persist messages before streaming begins — so a mid-stream page refresh still shows the user's message.

Field Type Description
ctx TaskRunContext Full task run context — reference
chatId string Chat session ID
messages ModelMessage[] Full accumulated conversation (model format)
uiMessages UIMessage[] Full accumulated conversation (UI format)
turn number Turn number (0-indexed)
runId string The Trigger.dev run ID
chatAccessToken string Scoped access token for this run
continuation boolean Whether this run is continuing an existing chat
preloaded boolean Whether this run was preloaded
clientData Typed by clientDataSchema Custom data from the frontend
writer ChatWriter Stream writer for custom chunks
export const myChat = chat.agent({
  id: "my-chat",
  onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages },
    });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken },
      update: { runId, publicAccessToken: chatAccessToken },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
By persisting in `onTurnStart`, the user's message is saved to your database before the AI starts streaming. If the user refreshes mid-stream, the message is already there.

onBeforeTurnComplete

Fires after the response is captured but before the stream closes. The writer can send custom chunks that appear in the current turn — use this for post-processing indicators, compaction progress, or any data the user should see before the turn ends.

export const myChat = chat.agent({
  id: "my-chat",
  onBeforeTurnComplete: async ({ writer, usage, uiMessages }) => {
    // Write a custom data part while the stream is still open
    writer.write({
      type: "data-usage-summary",
      data: {
        tokens: usage?.totalTokens,
        messageCount: uiMessages.length,
      },
    });

    // You can also compact messages here and write progress
    if (usage?.totalTokens && usage.totalTokens > 50_000) {
      writer.write({ type: "data-compaction", data: { status: "compacting" } });
      chat.setMessages(compactedMessages);
      writer.write({ type: "data-compaction", data: { status: "complete" } });
    }
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});

Receives the same fields as TurnCompleteEvent, plus a writer.

onTurnComplete

Fires after each turn completes — after the response is captured and the stream is closed. This is the primary hook for persisting the assistant's response. Does not include a writer since the stream is already closed.

Field Type Description
ctx TaskRunContext Full task run context — reference
chatId string Chat session ID
messages ModelMessage[] Full accumulated conversation (model format)
uiMessages UIMessage[] Full accumulated conversation (UI format)
newMessages ModelMessage[] Only this turn's messages (model format)
newUIMessages UIMessage[] Only this turn's messages (UI format)
responseMessage UIMessage | undefined The assistant's response for this turn
turn number Turn number (0-indexed)
runId string The Trigger.dev run ID
chatAccessToken string Scoped access token for this run
lastEventId string | undefined Stream position for resumption. Persist this with the session.
stopped boolean Whether the user stopped generation during this turn
continuation boolean Whether this run is continuing an existing chat
rawResponseMessage UIMessage | undefined The raw assistant response before abort cleanup (same as responseMessage when not stopped)
export const myChat = chat.agent({
  id: "my-chat",
  onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages },
    });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId },
      update: { runId, publicAccessToken: chatAccessToken, lastEventId },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
Use `uiMessages` to overwrite the full conversation each turn (simplest). Use `newUIMessages` if you prefer to store messages individually — for example, one database row per message. Persist `lastEventId` alongside the session. When the transport reconnects after a page refresh, it uses this to skip past already-seen events — preventing duplicate messages. For a full **conversation + session** persistence pattern (including preload, continuation, and token renewal), see [Database persistence](/ai-chat/patterns/database-persistence).

onChatSuspend / onChatResume

Chat-specific hooks that fire at the idle-to-suspended transition — the moment the run stops using compute and waits for the next message. These replace the need for the generic onWait / onResume task hooks for chat-specific work.

The phase discriminator tells you when the suspend/resume happened:

  • "preload" — after onPreload, waiting for the first message
  • "turn" — after onTurnComplete, waiting for the next message
export const myChat = chat.agent({
  id: "my-chat",
  onChatSuspend: async (event) => {
    // Tear down expensive resources before suspending
    await disposeCodeSandbox(event.ctx.run.id);
    if (event.phase === "turn") {
      logger.info("Suspending after turn", { turn: event.turn });
    }
  },
  onChatResume: async (event) => {
    // Re-initialize after waking up
    logger.info("Resumed", { phase: event.phase });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
Field Type Description
phase "preload" | "turn" Whether this is a preload or post-turn suspension
ctx TaskRunContext Full task run context
chatId string Chat session ID
runId string The Trigger.dev run ID
clientData Typed by clientDataSchema Custom data from the frontend
turn number Turn number ("turn" phase only)
messages ModelMessage[] Accumulated model messages ("turn" phase only)
uiMessages UIMessage[] Accumulated UI messages ("turn" phase only)
Unlike `onWait` (which fires for all wait types — duration, task, batch, token), `onChatSuspend` fires only at chat suspension points with full chat context. No need to filter on `wait.type`.

exitAfterPreloadIdle

When set to true, a preloaded run completes successfully after the idle timeout elapses instead of suspending. Use this for "fire and forget" preloads — if the user doesn't send a message during the idle window, the run ends cleanly.

export const myChat = chat.agent({
  id: "my-chat",
  preloadIdleTimeoutInSeconds: 10,
  exitAfterPreloadIdle: true,
  onPreload: async ({ chatId, clientData }) => {
    // Eagerly set up state — if no message comes, the run just ends
    await initializeChat(chatId, clientData);
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});

Using prompts

Use AI Prompts to manage your system prompt as versioned, overridable config. Store the resolved prompt in a lifecycle hook with chat.prompt.set(), then spread chat.toStreamTextOptions() into streamText — it includes the system prompt, model, config, and telemetry automatically.

import { chat } from "@trigger.dev/sdk/ai";
import { prompts } from "@trigger.dev/sdk";
import { streamText, createProviderRegistry } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

const registry = createProviderRegistry({ openai });

const systemPrompt = prompts.define({
  id: "my-chat-system",
  model: "openai:gpt-4o",
  config: { temperature: 0.7 },
  variables: z.object({ name: z.string() }),
  content: `You are a helpful assistant for {{name}}.`,
});

export const myChat = chat.agent({
  id: "my-chat",
  clientDataSchema: z.object({ userId: z.string() }),
  onChatStart: async ({ clientData }) => {
    const user = await db.user.findUnique({ where: { id: clientData.userId } });
    const resolved = await systemPrompt.resolve({ name: user.name });
    chat.prompt.set(resolved);
  },
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions({ registry }), // system, model, config, telemetry
      messages,
      abortSignal: signal,
    });
  },
});

chat.toStreamTextOptions() returns an object with system, model (resolved via the registry), temperature, and experimental_telemetry — all from the stored prompt. Properties you set after the spread (like a client-selected model) take precedence.

See [Prompts](/ai/prompts) for the full guide — defining templates, variable schemas, dashboard overrides, and the management SDK.

Stop generation

How stop works

Calling stop() from useChat sends a stop signal to the running task via input streams. The task's streamText call aborts (if you passed signal or stopSignal), but the run stays alive and waits for the next message. The partial response is captured and accumulated normally.

Abort signals

The run function receives three abort signals:

Signal Fires when Use for
signal Stop or cancel Pass to streamText — handles both cases. Use this in most cases.
stopSignal Stop only (per-turn, reset each turn) Custom logic that should only run on user stop, not cancellation
cancelSignal Run cancel, expire, or maxDuration exceeded Cleanup that should only happen on full cancellation
export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal, stopSignal, cancelSignal }) => {
    return streamText({
      model: openai("gpt-4o"),
      messages,
      abortSignal: signal, // Handles both stop and cancel
    });
  },
});
Use `signal` (the combined signal) in most cases. The separate `stopSignal` and `cancelSignal` are only needed if you want different behavior for stop vs cancel.

Detecting stop in callbacks

The onTurnComplete event includes a stopped boolean that indicates whether the user stopped generation during that turn:

export const myChat = chat.agent({
  id: "my-chat",
  onTurnComplete: async ({ chatId, uiMessages, stopped }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages, lastStoppedAt: stopped ? new Date() : undefined },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});

You can also check stop status from anywhere during a turn using chat.isStopped(). This is useful inside streamText's onFinish callback where the AI SDK's isAborted flag can be unreliable (e.g. when using createUIMessageStream + writer.merge()):

import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";

export const myChat = chat.agent({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: openai("gpt-4o"),
      messages,
      abortSignal: signal,
      onFinish: ({ isAborted }) => {
        // isAborted may be false even after stop when using createUIMessageStream
        const wasStopped = isAborted || chat.isStopped();
        if (wasStopped) {
          // handle stop — e.g. log analytics
        }
      },
    });
  },
});

Cleaning up aborted messages

When stop happens mid-stream, the captured response message can contain parts in an incomplete state — tool calls stuck in partial-call, reasoning blocks still marked as streaming, etc. These can cause UI issues like permanent spinners.

chat.agent automatically cleans up the responseMessage when stop is detected before passing it to onTurnComplete. If you use chat.pipe() manually and capture response messages yourself, use chat.cleanupAbortedParts():

const cleaned = chat.cleanupAbortedParts(rawResponseMessage);

This removes tool invocation parts stuck in partial-call state and marks any streaming text or reasoning parts as done.

Stop signal delivery is best-effort. There is a small race window where the model may finish before the stop signal arrives, in which case the turn completes normally with `stopped: false`. This is expected and does not require special handling.

Persistence

What needs to be persisted

To build a chat app that survives page refreshes, you need to persist two things:

  1. Messages — The conversation history. Persisted server-side in the task via onTurnStart and onTurnComplete.
  2. Sessions — The transport's connection state (runId, publicAccessToken, lastEventId). Persisted server-side via onTurnStart and onTurnComplete.
Sessions let the transport reconnect to an existing run after a page refresh. Without them, every page load would start a new run — losing the conversation context that was accumulated in the previous run.

Full persistence example

```ts trigger/chat.ts import { chat } from "@trigger.dev/sdk/ai"; import { streamText } from "ai"; import { openai } from "@ai-sdk/openai"; import { z } from "zod"; import { db } from "@/lib/db";

export const myChat = chat.agent({ id: "my-chat", clientDataSchema: z.object({ userId: z.string(), }), onChatStart: async ({ chatId, clientData }) => { await db.chat.create({ data: { id: chatId, userId: clientData.userId, title: "New chat", messages: [] }, }); }, onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => { // Persist messages + session before streaming await db.chat.update({ where: { id: chatId }, data: { messages: uiMessages }, }); await db.chatSession.upsert({ where: { id: chatId }, create: { id: chatId, runId, publicAccessToken: chatAccessToken }, update: { runId, publicAccessToken: chatAccessToken }, }); }, onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => { // Persist assistant response + stream position await db.chat.update({ where: { id: chatId }, data: { messages: uiMessages }, }); await db.chatSession.upsert({ where: { id: chatId }, create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId }, update: { runId, publicAccessToken: chatAccessToken, lastEventId }, }); }, run: async ({ messages, signal }) => { return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal, }); }, });


```ts app/actions.ts
"use server";

import { chat } from "@trigger.dev/sdk/ai";
import type { myChat } from "@/trigger/chat";
import { db } from "@/lib/db";

export const getChatToken = () => chat.createAccessToken<typeof myChat>("my-chat");

export async function getChatMessages(chatId: string) {
  const found = await db.chat.findUnique({ where: { id: chatId } });
  return found?.messages ?? [];
}

export async function getAllSessions() {
  const sessions = await db.chatSession.findMany();
  const result: Record<
    string,
    {
      runId: string;
      publicAccessToken: string;
      lastEventId?: string;
    }
  > = {};
  for (const s of sessions) {
    result[s.id] = {
      runId: s.runId,
      publicAccessToken: s.publicAccessToken,
      lastEventId: s.lastEventId ?? undefined,
    };
  }
  return result;
}

export async function deleteSession(chatId: string) {
  await db.chatSession.delete({ where: { id: chatId } }).catch(() => {});
}
"use client";

import { useChat } from "@ai-sdk/react";
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
import type { myChat } from "@/trigger/chat";
import { getChatToken, deleteSession } from "@/app/actions";

export function Chat({ chatId, initialMessages, initialSessions }) {
  const transport = useTriggerChatTransport<typeof myChat>({
    task: "my-chat",
    accessToken: getChatToken,
    clientData: { userId: currentUser.id }, // Type-checked against clientDataSchema
    sessions: initialSessions,
    onSessionChange: (id, session) => {
      if (!session) deleteSession(id);
    },
  });

  const { messages, sendMessage, stop, status } = useChat({
    id: chatId,
    messages: initialMessages,
    transport,
    resume: initialMessages.length > 0,
  });

  return (
    <div>
      {messages.map((m) => (
        <div key={m.id}>
          <strong>{m.role}:</strong>
          {m.parts.map((part, i) =>
            part.type === "text" ? <span key={i}>{part.text}</span> : null
          )}
        </div>
      ))}

      <form
        onSubmit={(e) => {
          e.preventDefault();
          const input = e.currentTarget.querySelector("input");
          if (input?.value) {
            sendMessage({ text: input.value });
            input.value = "";
          }
        }}
      >
        <input placeholder="Type a message..." />
        <button type="submit" disabled={status === "streaming"}>
          Send
        </button>
        {status === "streaming" && (
          <button type="button" onClick={stop}>
            Stop
          </button>
        )}
      </form>
    </div>
  );
}

Pending messages (steering)

Users can send messages while the agent is executing tool calls. With pendingMessages, these messages are injected between tool-call steps, steering the agent mid-execution:

export const myChat = chat.agent({
  id: "my-chat",
  pendingMessages: {
    shouldInject: ({ steps }) => steps.length > 0,
  },
  run: async ({ messages, signal }) => {
    return streamText({
      ...chat.toStreamTextOptions({ registry }),
      messages,
      tools: {
        /* ... */
      },
      abortSignal: signal,
    });
  },
});

On the frontend, the usePendingMessages hook handles sending, tracking, and rendering injection points.

See [Pending Messages](/ai-chat/pending-messages) for the full guide — backend configuration, frontend hook, queuing vs steering, and how injection works with all three chat variants.

Background injection

Inject context from background work into the conversation using chat.inject(). Combine with chat.defer() to run analysis between turns and inject results before the next response — self-review, RAG augmentation, safety checks, etc.

export const myChat = chat.agent({
  id: "my-chat",
  onTurnComplete: async ({ messages }) => {
    chat.defer(
      (async () => {
        const review = await generateObject({
          /* ... */
        });
        if (review.object.needsImprovement) {
          chat.inject([
            {
              role: "system",
              content: `[Self-review]\n${review.object.suggestions.join("\n")}`,
            },
          ]);
        }
      })()
    );
  },
  run: async ({ messages, signal }) => {
    return streamText({ ...chat.toStreamTextOptions({ registry }), messages, abortSignal: signal });
  },
});
See [Background Injection](/ai-chat/background-injection) for the full guide — timing, self-review example, and how it differs from pending messages.

prepareMessages

Transform model messages before they're used anywhere — in run(), in compaction rebuilds, and in compaction results. Define once, applied everywhere.

Use this for Anthropic cache breaks, injecting system context, stripping PII, etc.

export const myChat = chat.agent({
  id: "my-chat",
  prepareMessages: ({ messages, reason }) => {
    // Add Anthropic cache breaks to the last message
    if (messages.length === 0) return messages;
    const last = messages[messages.length - 1];
    return [
      ...messages.slice(0, -1),
      {
        ...last,
        providerOptions: {
          ...last.providerOptions,
          anthropic: { cacheControl: { type: "ephemeral" } },
        },
      },
    ];
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});

The reason field tells you why messages are being prepared:

Reason Description
"run" Messages being passed to run() for streamText
"compaction-rebuild" Rebuilding from a previous compaction summary
"compaction-result" Fresh compaction just produced these messages

Version upgrades

Chat agent runs are pinned to the worker version they started on. When you deploy a new version, suspended runs resume on the old code. Call chat.requestUpgrade() in onTurnStart to skip run() and exit immediately — the transport re-triggers the same message on the latest version. See the Version Upgrades pattern for the full guide.

Runtime configuration

chat.setTurnTimeout()

Override how long the run stays suspended waiting for the next message. Call from inside run():

run: async ({ messages, signal }) => {
  chat.setTurnTimeout("2h"); // Wait longer for this conversation
  return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},

chat.setIdleTimeoutInSeconds()

Override how long the run stays idle (active, using compute) after each turn:

run: async ({ messages, signal }) => {
  chat.setIdleTimeoutInSeconds(60); // Stay idle for 1 minute
  return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
Longer idle timeout means faster responses but more compute usage. Set to `0` to suspend immediately after each turn (minimum latency cost, slight delay on next message).

Stream options

Control how streamText results are converted to the frontend stream via toUIMessageStream(). Set static defaults on the task, or override per-turn.

Error handling with onError

When streamText encounters an error mid-stream (rate limits, API failures, network errors), the onError callback converts it to a string that's sent to the frontend as an { type: "error", errorText } chunk. The AI SDK's useChat receives this via its onError callback.

By default, the raw error message is sent to the frontend. Use onError to sanitize errors and avoid leaking internal details:

export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    onError: (error) => {
      // Log the full error server-side for debugging
      console.error("Stream error:", error);
      // Return a sanitized message — this is what the frontend sees
      if (error instanceof Error && error.message.includes("rate limit")) {
        return "Rate limited — please wait a moment and try again.";
      }
      return "Something went wrong. Please try again.";
    },
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});

onError is also called for tool execution errors, so a single handler covers both LLM errors and tool failures.

On the frontend, handle the error in useChat:

const { messages, sendMessage } = useChat({
  transport,
  onError: (error) => {
    // error.message contains the string returned by your onError handler
    toast.error(error.message);
  },
});
Reasoning and sources

Control which AI SDK features are forwarded to the frontend:

export const myChat = chat.agent({
  id: "my-chat",
  uiMessageStreamOptions: {
    sendReasoning: true, // Forward model reasoning (default: true)
    sendSources: true, // Forward source citations (default: false)
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
Per-turn overrides

Override per-turn with chat.setUIMessageStreamOptions() — per-turn values merge with the static config (per-turn wins on conflicts). The override is cleared automatically after each turn.

run: async ({ messages, clientData, signal }) => {
  // Enable reasoning only for certain models
  if (clientData.model?.includes("claude")) {
    chat.setUIMessageStreamOptions({ sendReasoning: true });
  }
  return streamText({ model: openai(clientData.model ?? "gpt-4o"), messages, abortSignal: signal });
},

chat.setUIMessageStreamOptions() works across all abstraction levels — chat.agent(), chat.createSession() / turn.complete(), and chat.pipeAndCapture().

See ChatUIMessageStreamOptions for the full reference.

`onFinish` is managed internally for response capture and cannot be overridden here. Use `streamText`'s `onFinish` callback for custom finish handling, or use [raw task mode](#raw-task-with-primitives) for full control over `toUIMessageStream()`.

Manual mode with task()

If you need full control over task options, use the standard task() with ChatTaskPayload and chat.pipe():

import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const manualChat = task({
  id: "manual-chat",
  retry: { maxAttempts: 3 },
  queue: { concurrencyLimit: 10 },
  run: async (payload: ChatTaskPayload) => {
    const result = streamText({
      model: openai("gpt-4o"),
      messages: payload.messages,
    });

    await chat.pipe(result);
  },
});
Manual mode does not get automatic message accumulation or the `onTurnComplete`/`onChatStart` lifecycle hooks. The `responseMessage` field in `onTurnComplete` will be `undefined` when using `chat.pipe()` directly. Use `chat.agent()` for the full multi-turn experience.

chat.createSession()

A middle ground between chat.agent() and raw primitives. You get an async iterator that yields ChatTurn objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic.

Use chat.createSession() inside a standard task():

import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const myChat = task({
  id: "my-chat",
  run: async (payload: ChatTaskWirePayload, { signal }) => {
    // One-time initialization — just code, no hooks
    const clientData = payload.metadata as { userId: string };
    await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } });

    const session = chat.createSession(payload, {
      signal,
      idleTimeoutInSeconds: 60,
      timeout: "1h",
    });

    for await (const turn of session) {
      const result = streamText({
        model: openai("gpt-4o"),
        messages: turn.messages,
        abortSignal: turn.signal,
      });

      // Pipe, capture, accumulate, and signal turn-complete — all in one call
      await turn.complete(result);

      // Persist after each turn
      await db.chat.update({
        where: { id: turn.chatId },
        data: { messages: turn.uiMessages },
      });
    }
  },
});

ChatSessionOptions

Option Type Default Description
signal AbortSignal required Run-level cancel signal (from task context)
idleTimeoutInSeconds number 30 Seconds to stay idle between turns
timeout string "1h" Duration string for suspend timeout
maxTurns number 100 Max turns before ending

ChatTurn

Each turn yielded by the iterator provides:

Field Type Description
number number Turn number (0-indexed)
chatId string Chat session ID
trigger string What triggered this turn
clientData unknown Client data from the transport
messages ModelMessage[] Full accumulated model messages — pass to streamText
uiMessages UIMessage[] Full accumulated UI messages — use for persistence
signal AbortSignal Combined stop+cancel signal (fresh each turn)
stopped boolean Whether the user stopped generation this turn
continuation boolean Whether this is a continuation run
Method Description
turn.complete(source) Pipe stream, capture response, accumulate, and signal turn-complete
turn.done() Just signal turn-complete (when you've piped manually)
turn.addResponse(response) Add a response to the accumulator manually

turn.complete() vs manual control

turn.complete(result) is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk.

For more control, you can do each step manually:

for await (const turn of session) {
  const result = streamText({
    model: openai("gpt-4o"),
    messages: turn.messages,
    abortSignal: turn.signal,
  });

  // Manual: pipe and capture separately
  const response = await chat.pipeAndCapture(result, { signal: turn.signal });

  if (response) {
    // Custom processing before accumulating
    await turn.addResponse(response);
  }

  // Custom persistence, analytics, etc.
  await db.chat.update({ ... });

  // Must call done() when not using complete()
  await turn.done();
}

Raw task with primitives

For full control, use a standard task() with the composable primitives from the chat namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling.

Raw task mode also lets you call .toUIMessageStream() yourself with any options — including onFinish and originalMessages. This is the right choice when you need complete control over the stream conversion beyond what chat.setUIMessageStreamOptions() provides.

Primitives

Primitive Description
chat.messages Input stream for incoming messages — use .waitWithIdleTimeout() to wait for the next turn
chat.createStopSignal() Create a managed stop signal wired to the stop input stream
chat.pipeAndCapture(result) Pipe a StreamTextResult to the chat stream and capture the response
chat.writeTurnComplete() Signal the frontend that the current turn is complete
chat.MessageAccumulator Accumulates conversation messages across turns
chat.pipe(stream) Pipe a stream to the frontend (no response capture)
chat.cleanupAbortedParts(msg) Clean up incomplete parts from a stopped response

Example

import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const myChat = task({
  id: "my-chat-raw",
  run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => {
    let currentPayload = payload;

    // Handle preload — wait for the first real message
    if (currentPayload.trigger === "preload") {
      const result = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for first message",
      });
      if (!result.ok) return;
      currentPayload = result.output;
    }

    const stop = chat.createStopSignal();
    const conversation = new chat.MessageAccumulator();

    for (let turn = 0; turn < 100; turn++) {
      stop.reset();

      const messages = await conversation.addIncoming(
        currentPayload.messages,
        currentPayload.trigger,
        turn
      );

      const combinedSignal = AbortSignal.any([runSignal, stop.signal]);

      const result = streamText({
        model: openai("gpt-4o"),
        messages,
        abortSignal: combinedSignal,
      });

      let response;
      try {
        response = await chat.pipeAndCapture(result, { signal: combinedSignal });
      } catch (error) {
        if (error instanceof Error && error.name === "AbortError") {
          if (runSignal.aborted) break;
          // Stop — fall through to accumulate partial
        } else {
          throw error;
        }
      }

      if (response) {
        const cleaned =
          stop.signal.aborted && !runSignal.aborted ? chat.cleanupAbortedParts(response) : response;
        await conversation.addResponse(cleaned);
      }

      if (runSignal.aborted) break;

      // Persist, analytics, etc.
      await db.chat.update({
        where: { id: currentPayload.chatId },
        data: { messages: conversation.uiMessages },
      });

      await chat.writeTurnComplete();

      // Wait for the next message
      const next = await chat.messages.waitWithIdleTimeout({
        idleTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for next message",
      });
      if (!next.ok) break;
      currentPayload = next.output;
    }

    stop.cleanup();
  },
});

MessageAccumulator

The MessageAccumulator handles the transport protocol automatically:

  • Turn 0: replaces messages (full history from frontend)
  • Subsequent turns: appends new messages (frontend only sends the new user message)
  • Regenerate: replaces messages (full history minus last assistant message)
const conversation = new chat.MessageAccumulator();

// Returns full accumulated ModelMessage[] for streamText
const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn);

// After piping, add the response
const response = await chat.pipeAndCapture(result);
if (response) await conversation.addResponse(response);

// Access accumulated messages for persistence
conversation.uiMessages; // UIMessage[]
conversation.modelMessages; // ModelMessage[]