Skip to content

Commit 3e21899

Browse files
committed
fix use chat race + headless trace issues
1 parent df15ebd commit 3e21899

9 files changed

Lines changed: 355 additions & 42 deletions

File tree

apps/sim/app/api/mcp/copilot/route.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { type NextRequest, NextResponse } from 'next/server'
1818
import { validateOAuthAccessToken } from '@/lib/auth/oauth-token'
1919
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
2020
import { ORCHESTRATION_TIMEOUT_MS, SIM_AGENT_API_URL } from '@/lib/copilot/constants'
21-
import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run'
21+
import { runHeadlessCopilotLifecycle } from '@/lib/copilot/request/lifecycle/headless'
2222
import { orchestrateSubagentStream } from '@/lib/copilot/request/subagent'
2323
import { ensureHandlersRegistered, executeTool } from '@/lib/copilot/tool-executor'
2424
import { prepareExecutionContext } from '@/lib/copilot/tools/handlers/context'
@@ -724,7 +724,7 @@ async function handleBuildToolCall(
724724
chatId,
725725
}
726726

727-
const result = await runCopilotLifecycle(requestPayload, {
727+
const result = await runHeadlessCopilotLifecycle(requestPayload, {
728728
userId,
729729
workflowId: resolved.workflowId,
730730
chatId,

apps/sim/app/api/mothership/execute/route.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { z } from 'zod'
44
import { checkInternalAuth } from '@/lib/auth/hybrid'
55
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload'
66
import { generateWorkspaceContext } from '@/lib/copilot/chat/workspace-context'
7-
import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run'
7+
import { runHeadlessCopilotLifecycle } from '@/lib/copilot/request/lifecycle/headless'
88
import { generateId } from '@/lib/core/utils/uuid'
99
import {
1010
assertActiveWorkspaceAccess,
@@ -72,7 +72,7 @@ export async function POST(req: NextRequest) {
7272
...(userPermission ? { userPermission } : {}),
7373
}
7474

75-
const result = await runCopilotLifecycle(requestPayload, {
75+
const result = await runHeadlessCopilotLifecycle(requestPayload, {
7676
userId,
7777
workspaceId,
7878
chatId: effectiveChatId,

apps/sim/app/api/v1/copilot/chat/route.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { z } from 'zod'
44
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/constants'
5-
import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run'
5+
import { runHeadlessCopilotLifecycle } from '@/lib/copilot/request/lifecycle/headless'
66
import { generateId } from '@/lib/core/utils/uuid'
77
import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils'
88
import { authenticateV1Request } from '@/app/api/v1/auth'
@@ -82,7 +82,7 @@ export async function POST(req: NextRequest) {
8282
// Always generate a chatId - required for artifacts system to work with subagents
8383
const chatId = parsed.chatId || generateId()
8484

85-
messageId = crypto.randomUUID()
85+
messageId = generateId()
8686
logger.info(
8787
messageId
8888
? `Received headless copilot chat start request [messageId:${messageId}]`
@@ -106,7 +106,7 @@ export async function POST(req: NextRequest) {
106106
chatId,
107107
}
108108

109-
const result = await runCopilotLifecycle(requestPayload, {
109+
const result = await runHeadlessCopilotLifecycle(requestPayload, {
110110
userId: auth.userId,
111111
workflowId: resolved.workflowId,
112112
chatId,

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
22
import { createLogger } from '@sim/logger'
33
import { useQueryClient } from '@tanstack/react-query'
4-
import { usePathname } from 'next/navigation'
4+
import { usePathname, useRouter } from 'next/navigation'
55
import { toDisplayMessage } from '@/lib/copilot/chat/display-message'
66
import type {
77
PersistedFileAttachment,
@@ -709,6 +709,7 @@ export function useChat(
709709
options?: UseChatOptions
710710
): UseChatReturn {
711711
const pathname = usePathname()
712+
const router = useRouter()
712713
const queryClient = useQueryClient()
713714
const [messages, setMessages] = useState<ChatMessage[]>([])
714715
const [isSending, setIsSending] = useState(false)
@@ -962,6 +963,8 @@ export function useChat(
962963
const recoveringClientWorkflowToolIdsRef = useRef<Set<string>>(new Set())
963964
const executionStream = useExecutionStream()
964965
const isHomePage = pathname.endsWith('/home')
966+
const wasHomePageRef = useRef(isHomePage)
967+
const pendingHomeResetRef = useRef(false)
965968

966969
const setTransportIdle = useCallback(() => {
967970
sendingRef.current = false
@@ -994,6 +997,25 @@ export function useChat(
994997
resetStreamingBuffers()
995998
}, [resetStreamingBuffers])
996999

1000+
const resetHomeChatState = useCallback(() => {
1001+
streamGenRef.current++
1002+
chatIdRef.current = undefined
1003+
lastCursorRef.current = '0'
1004+
locallyTerminalStreamIdRef.current = undefined
1005+
clearActiveTurn()
1006+
setResolvedChatId(undefined)
1007+
appliedChatHistoryKeyRef.current = undefined
1008+
abortControllerRef.current = null
1009+
setMessages([])
1010+
setError(null)
1011+
setTransportIdle()
1012+
setResources([])
1013+
setActiveResourceId(null)
1014+
resetEphemeralPreviewState()
1015+
setMessageQueue([])
1016+
clearQueueDispatchState()
1017+
}, [clearActiveTurn, clearQueueDispatchState, resetEphemeralPreviewState, setTransportIdle])
1018+
9971019
const mergeServerMessagesWithActiveTurn = useCallback(
9981020
(serverMessages: ChatMessage[], previousMessages: ChatMessage[]) => {
9991021
const activeTurn = activeTurnRef.current
@@ -1196,31 +1218,33 @@ export function useChat(
11961218
])
11971219

11981220
useEffect(() => {
1199-
if (workflowIdRef.current) return
1200-
if (!isHomePage || !chatIdRef.current) return
1201-
streamGenRef.current++
1202-
chatIdRef.current = undefined
1203-
lastCursorRef.current = '0'
1204-
locallyTerminalStreamIdRef.current = undefined
1205-
clearActiveTurn()
1206-
setResolvedChatId(undefined)
1207-
appliedChatHistoryKeyRef.current = undefined
1208-
abortControllerRef.current = null
1209-
setMessages([])
1210-
setError(null)
1211-
setTransportIdle()
1212-
setResources([])
1213-
setActiveResourceId(null)
1214-
resetEphemeralPreviewState()
1215-
setMessageQueue([])
1216-
clearQueueDispatchState()
1217-
}, [
1218-
isHomePage,
1219-
resetEphemeralPreviewState,
1220-
clearQueueDispatchState,
1221-
clearActiveTurn,
1222-
setTransportIdle,
1223-
])
1221+
const wasHomePage = wasHomePageRef.current
1222+
wasHomePageRef.current = isHomePage
1223+
1224+
if (!isHomePage) {
1225+
pendingHomeResetRef.current = false
1226+
return
1227+
}
1228+
if (workflowIdRef.current || !chatIdRef.current) return
1229+
1230+
const shouldHandleHomeReset = pendingHomeResetRef.current || !wasHomePage
1231+
if (!shouldHandleHomeReset) return
1232+
1233+
const hasActiveTransport =
1234+
isSending ||
1235+
sendingRef.current ||
1236+
isReconnecting ||
1237+
abortControllerRef.current !== null ||
1238+
streamReaderRef.current !== null
1239+
1240+
if (hasActiveTransport) {
1241+
pendingHomeResetRef.current = true
1242+
return
1243+
}
1244+
1245+
pendingHomeResetRef.current = false
1246+
resetHomeChatState()
1247+
}, [isHomePage, isReconnecting, isSending, resetHomeChatState])
12241248

12251249
useEffect(() => {
12261250
if (!chatHistory) return
@@ -1583,11 +1607,9 @@ export function useChat(
15831607
})
15841608
}
15851609
if (!workflowIdRef.current) {
1586-
window.history.replaceState(
1587-
null,
1588-
'',
1589-
`/workspace/${workspaceId}/task/${payloadChatId}`
1590-
)
1610+
router.replace(`/workspace/${workspaceId}/task/${payloadChatId}`, {
1611+
scroll: false,
1612+
})
15911613
}
15921614
}
15931615
}
@@ -2153,6 +2175,7 @@ export function useChat(
21532175
},
21542176
[
21552177
workspaceId,
2178+
router,
21562179
queryClient,
21572180
addResource,
21582181
removeResource,

apps/sim/lib/copilot/request/go/stream.test.ts

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*/
44
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
55
import {
6+
MothershipStreamV1CompletionStatus,
67
MothershipStreamV1EventType,
78
MothershipStreamV1ToolExecutor,
89
MothershipStreamV1ToolMode,
@@ -145,8 +146,18 @@ describe('copilot go stream helpers', () => {
145146
output: { value: 'ok' },
146147
},
147148
})
149+
const complete = createEvent({
150+
streamId: 'stream-1',
151+
cursor: '2',
152+
seq: 2,
153+
requestId: 'req-1',
154+
type: MothershipStreamV1EventType.complete,
155+
payload: {
156+
status: MothershipStreamV1CompletionStatus.complete,
157+
},
158+
})
148159

149-
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([toolResult, toolResult]))
160+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([toolResult, toolResult, complete]))
150161

151162
const onEvent = vi.fn()
152163
const context = createStreamingContext()
@@ -160,7 +171,10 @@ describe('copilot go stream helpers', () => {
160171
timeout: 1000,
161172
})
162173

163-
expect(onEvent).toHaveBeenCalledTimes(1)
174+
expect(onEvent.mock.calls.map(([event]) => event.type)).toEqual([
175+
MothershipStreamV1EventType.tool,
176+
MothershipStreamV1EventType.complete,
177+
])
164178
expect(onEvent).toHaveBeenCalledWith(
165179
expect.objectContaining({
166180
type: MothershipStreamV1EventType.tool,
@@ -180,6 +194,39 @@ describe('copilot go stream helpers', () => {
180194
)
181195
})
182196

197+
it('fails closed when the shared stream ends before a terminal event', async () => {
198+
const textEvent = createEvent({
199+
streamId: 'stream-1',
200+
cursor: '1',
201+
seq: 1,
202+
requestId: 'req-1',
203+
type: MothershipStreamV1EventType.text,
204+
payload: {
205+
channel: 'assistant',
206+
text: 'partial response',
207+
},
208+
})
209+
210+
vi.mocked(fetch).mockResolvedValueOnce(createSseResponse([textEvent]))
211+
212+
const context = createStreamingContext()
213+
const execContext: ExecutionContext = {
214+
userId: 'user-1',
215+
workflowId: 'workflow-1',
216+
}
217+
218+
await expect(
219+
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
220+
timeout: 1000,
221+
})
222+
).rejects.toThrow('Copilot backend stream ended before a terminal event')
223+
expect(
224+
context.errors.some((message) =>
225+
message.includes('Copilot backend stream ended before a terminal event')
226+
)
227+
).toBe(true)
228+
})
229+
183230
it('fails closed when the shared stream receives an invalid event', async () => {
184231
vi.mocked(fetch).mockResolvedValueOnce(
185232
createSseResponse([

apps/sim/lib/copilot/request/go/stream.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,18 @@ export async function runStreamLoop(
277277
}
278278
return context.streamComplete || undefined
279279
})
280+
281+
if (!context.streamComplete && !abortSignal?.aborted && !context.wasAborted) {
282+
const streamPath = new URL(fetchUrl).pathname
283+
const message = `Copilot backend stream ended before a terminal event on ${streamPath}`
284+
context.errors.push(message)
285+
logger.error('Copilot backend stream ended before a terminal event', {
286+
path: streamPath,
287+
requestId: context.requestId,
288+
messageId: context.messageId,
289+
})
290+
throw new CopilotBackendError(message, { status: 503 })
291+
}
280292
} catch (error) {
281293
if (error instanceof FatalSseEventError && !context.errors.includes(error.message)) {
282294
context.errors.push(error.message)

0 commit comments

Comments
 (0)