Skip to content

Commit 491b45a

Browse files
committed
feat(chat): expose typed chat.stream, add deepResearch subtask example, per-chat model persistence, debug panel
- Export chat.stream (typed RealtimeDefinedStream<UIMessageChunk>) for writing custom data to the chat stream - Add deepResearch subtask using data-* chunks to stream progress back to parent chat via target: root - Use AI SDK data-research-progress chunk protocol with id-based updates for live progress - Add ResearchProgress component and generic data-* fallback renderer in frontend - Persist model per chat in DB (schema + onChatStart), model selector only on new chats - Add collapsible debug panel showing run ID (with dashboard link), chat ID, model, status, session info - Document chat.stream API, data-* chunks, and subtask streaming pattern in docs
1 parent ef746bb commit 491b45a

3 files changed

Lines changed: 188 additions & 4 deletions

File tree

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
type TaskSchema,
1515
type TaskWithSchema,
1616
} from "@trigger.dev/core/v3";
17-
import type { ModelMessage, UIMessage } from "ai";
17+
import type { ModelMessage, UIMessage, UIMessageChunk } from "ai";
1818
import type { StreamWriteResult } from "@trigger.dev/core/v3";
1919
import { convertToModelMessages, dynamicTool, generateId as generateMessageId, jsonSchema, JSONSchema7, Schema, Tool, ToolCallOptions, zodSchema } from "ai";
2020
import { type Attributes, trace } from "@opentelemetry/api";
@@ -175,6 +175,29 @@ export const CHAT_STREAM_KEY = _CHAT_STREAM_KEY;
175175
// Re-export input stream IDs for advanced usage
176176
export { CHAT_MESSAGES_STREAM_ID, CHAT_STOP_STREAM_ID };
177177

178+
/**
179+
* Typed chat output stream. Provides `.writer()`, `.pipe()`, `.append()`,
180+
* and `.read()` methods pre-bound to the chat stream key and typed to `UIMessageChunk`.
181+
*
182+
* Use from within a `chat.task` run to write custom chunks:
183+
* ```ts
184+
* const { waitUntilComplete } = chat.stream.writer({
185+
* execute: ({ write }) => {
186+
* write({ type: "text-start", id: "status-1" });
187+
* write({ type: "text-delta", id: "status-1", delta: "Processing..." });
188+
* write({ type: "text-end", id: "status-1" });
189+
* },
190+
* });
191+
* await waitUntilComplete();
192+
* ```
193+
*
194+
* Use from a subtask to stream back to the parent chat:
195+
* ```ts
196+
* chat.stream.pipe(myStream, { target: "root" });
197+
* ```
198+
*/
199+
const chatStream = streams.define<UIMessageChunk>({ id: _CHAT_STREAM_KEY });
200+
178201
/**
179202
* The wire payload shape sent by `TriggerChatTransport`.
180203
* Uses `metadata` to match the AI SDK's `ChatRequestOptions` field name.
@@ -1452,6 +1475,8 @@ export const chat = {
14521475
isStopped,
14531476
/** Clean up aborted parts from a UIMessage. See {@link cleanupAbortedParts}. */
14541477
cleanupAbortedParts,
1478+
/** Typed chat output stream for writing custom chunks or piping from subtasks. */
1479+
stream: chatStream,
14551480
};
14561481

14571482
/**

references/ai-chat/src/components/chat.tsx

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,46 @@ function ToolInvocation({ part }: { part: any }) {
7070
);
7171
}
7272

73+
function ResearchProgress({ part }: { part: any }) {
74+
const data = part.data as {
75+
status: "fetching" | "done";
76+
query: string;
77+
current: number;
78+
total: number;
79+
currentUrl?: string;
80+
completedUrls: string[];
81+
};
82+
83+
const isDone = data.status === "done";
84+
85+
return (
86+
<div className="my-2 rounded border border-blue-200 bg-blue-50 px-3 py-2 text-xs">
87+
<div className="flex items-center gap-2 font-medium text-blue-700">
88+
{isDone ? (
89+
<span className="text-green-600">&#10003;</span>
90+
) : (
91+
<span className="inline-block h-3 w-3 animate-spin rounded-full border-2 border-blue-300 border-t-blue-600" />
92+
)}
93+
<span>
94+
{isDone
95+
? `Research complete — ${data.total} sources fetched`
96+
: `Researching "${data.query}" (${data.current}/${data.total})`}
97+
</span>
98+
</div>
99+
{data.currentUrl && !isDone && (
100+
<div className="mt-1 truncate text-blue-500">Fetching {data.currentUrl}</div>
101+
)}
102+
{data.completedUrls.length > 0 && (
103+
<div className="mt-1 space-y-0.5 text-blue-400">
104+
{data.completedUrls.map((url, i) => (
105+
<div key={i} className="truncate">&#10003; {url}</div>
106+
))}
107+
</div>
108+
)}
109+
</div>
110+
);
111+
}
112+
73113
function DebugPanel({
74114
chatId,
75115
model,
@@ -321,10 +361,25 @@ export function Chat({
321361
);
322362
}
323363

364+
if (part.type === "data-research-progress") {
365+
return <ResearchProgress key={i} part={part} />;
366+
}
367+
324368
if (part.type.startsWith("tool-") || part.type === "dynamic-tool") {
325369
return <ToolInvocation key={i} part={part} />;
326370
}
327371

372+
if (part.type.startsWith("data-")) {
373+
return (
374+
<div key={i} className="my-1 rounded border border-gray-200 bg-gray-50 p-2 text-xs text-gray-500">
375+
<span className="font-medium">{part.type}</span>
376+
<pre className="mt-1 overflow-x-auto whitespace-pre-wrap">
377+
{JSON.stringify((part as any).data, null, 2)}
378+
</pre>
379+
</div>
380+
);
381+
}
382+
328383
return null;
329384
})}
330385
</div>

references/ai-chat/src/trigger/chat.ts

Lines changed: 107 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { chat } from "@trigger.dev/sdk/ai";
2-
import { streamText, tool, stepCountIs } from "ai";
1+
import { chat, ai } from "@trigger.dev/sdk/ai";
2+
import { schemaTask } from "@trigger.dev/sdk";
3+
import { streamText, tool, stepCountIs, generateId } from "ai";
34
import type { LanguageModel } from "ai";
45
import { openai } from "@ai-sdk/openai";
56
import { anthropic } from "@ai-sdk/anthropic";
@@ -135,6 +136,105 @@ const userContext = chat.local<{
135136
messageCount: number;
136137
}>();
137138

139+
// --------------------------------------------------------------------------
140+
// Subtask: deep research — fetches multiple URLs and streams progress
141+
// back to the parent chat via chat.stream using data-* chunks
142+
// --------------------------------------------------------------------------
143+
export const deepResearch = schemaTask({
144+
id: "deep-research",
145+
description:
146+
"Research a topic by fetching multiple URLs and synthesizing the results. " +
147+
"Streams progress updates to the chat as it works.",
148+
schema: z.object({
149+
query: z.string().describe("The research query or topic"),
150+
urls: z.array(z.string().url()).describe("URLs to fetch and analyze"),
151+
}),
152+
run: async ({ query, urls }) => {
153+
const partId = generateId();
154+
const results: { url: string; status: number; snippet: string }[] = [];
155+
156+
// Stream progress using data-research-progress chunks.
157+
// Using the same id means each write updates the same part in the message.
158+
function streamProgress(progress: {
159+
status: "fetching" | "done";
160+
query: string;
161+
current: number;
162+
total: number;
163+
currentUrl?: string;
164+
completedUrls: string[];
165+
}) {
166+
return chat.stream.writer({
167+
target: "root",
168+
execute: ({ write }) => {
169+
write({
170+
type: "data-research-progress" as any,
171+
id: partId,
172+
data: progress,
173+
});
174+
},
175+
});
176+
}
177+
178+
for (let i = 0; i < urls.length; i++) {
179+
const url = urls[i]!;
180+
181+
// Update progress — fetching
182+
const { waitUntilComplete } = streamProgress({
183+
status: "fetching",
184+
query,
185+
current: i + 1,
186+
total: urls.length,
187+
currentUrl: url,
188+
completedUrls: results.map((r) => r.url),
189+
});
190+
await waitUntilComplete();
191+
192+
try {
193+
const response = await fetch(url);
194+
let text = await response.text();
195+
const contentType = response.headers.get("content-type") ?? "";
196+
197+
if (contentType.includes("html")) {
198+
text = text
199+
.replace(/<script[\s\S]*?<\/script>/gi, "")
200+
.replace(/<style[\s\S]*?<\/style>/gi, "")
201+
.replace(/<[^>]+>/g, " ")
202+
.replace(/&nbsp;/g, " ")
203+
.replace(/&amp;/g, "&")
204+
.replace(/&lt;/g, "<")
205+
.replace(/&gt;/g, ">")
206+
.replace(/\s+/g, " ")
207+
.trim();
208+
}
209+
210+
results.push({
211+
url,
212+
status: response.status,
213+
snippet: text.slice(0, 500),
214+
});
215+
} catch (err) {
216+
results.push({
217+
url,
218+
status: 0,
219+
snippet: `Error: ${err instanceof Error ? err.message : String(err)}`,
220+
});
221+
}
222+
}
223+
224+
// Final progress update — done
225+
const { waitUntilComplete: waitForDone } = streamProgress({
226+
status: "done",
227+
query,
228+
current: urls.length,
229+
total: urls.length,
230+
completedUrls: results.map((r) => r.url),
231+
});
232+
await waitForDone();
233+
234+
return { query, results };
235+
},
236+
});
237+
138238
export const aiChat = chat.task({
139239
id: "ai-chat",
140240
clientDataSchema: z.object({ model: z.string().optional(), userId: z.string() }),
@@ -228,7 +328,11 @@ export const aiChat = chat.task({
228328
model: getModel(modelId),
229329
system: `You are a helpful assistant for ${userContext.name} (${userContext.plan} plan). Be concise and friendly.`,
230330
messages,
231-
tools: { inspectEnvironment, webFetch },
331+
tools: {
332+
inspectEnvironment,
333+
webFetch,
334+
deepResearch: ai.tool(deepResearch),
335+
},
232336
stopWhen: stepCountIs(10),
233337
abortSignal: stopSignal,
234338
providerOptions: {

0 commit comments

Comments
 (0)