Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 90 additions & 9 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ interface State {
clients: Record<string, MCPClient>
defs: Record<string, MCPToolDef[]>
instructions: Record<string, string>
generation: Record<string, number>
reconnecting: Record<string, number>
disposed: boolean
}

export interface ServerInstructions {
Expand Down Expand Up @@ -428,16 +431,24 @@ export const layer = Layer.effect(
Effect.catch(() => Effect.succeed([] as number[])),
)

function watch(s: State, name: string, client: MCPClient, bridge: EffectBridge.Shape, timeout?: number) {
function watch(
s: State,
name: string,
client: MCPClient,
bridge: EffectBridge.Shape,
mcp: ConfigMCPV1.Info,
generation: number,
) {
client.onclose = () => {
if (s.clients[name] !== client) return
if (s.disposed || s.clients[name] !== client || s.generation[name] !== generation) return
delete s.clients[name]
delete s.defs[name]
delete s.instructions[name]
s.status[name] = { status: "failed", error: "Connection closed" }
bridge.fork(
Effect.logWarning("MCP connection closed", { server: name }).pipe(
Effect.andThen(events.publish(ToolsChanged, { server: name })),
Effect.andThen(events.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)),
Effect.andThen(mcp.type === "remote" ? reconnect(s, name, mcp, generation) : Effect.void),
Effect.ignore,
),
)
Expand All @@ -451,7 +462,7 @@ export const layer = Layer.effect(
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

const listed = await bridge.promise(McpCatalog.defs(client, timeout))
const listed = await bridge.promise(McpCatalog.defs(client, mcp.timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

Expand Down Expand Up @@ -489,6 +500,9 @@ export const layer = Layer.effect(
clients: {},
defs: {},
instructions: {},
generation: {},
reconnecting: {},
disposed: false,
}

yield* Effect.forEach(
Expand All @@ -505,20 +519,22 @@ export const layer = Layer.effect(
return
}

const generation = nextGeneration(s, key)
const result = yield* create(key, mcp)
s.status[key] = result.status
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs!
if (result.instructions) s.instructions[key] = result.instructions
watch(s, key, result.mcpClient, bridge, mcp.timeout)
watch(s, key, result.mcpClient, bridge, mcp, generation)
}
}),
{ concurrency: "unbounded" },
)

yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
s.disposed = true
const clients = Object.values(s.clients)
s.clients = {}
s.defs = {}
Expand Down Expand Up @@ -563,7 +579,8 @@ export const layer = Layer.effect(
client: MCPClient,
listed: MCPToolDef[],
instructions: string | undefined,
timeout?: number,
mcp: ConfigMCPV1.Info,
generation: number,
) {
const bridge = yield* EffectBridge.make()
const previous = s.clients[name]
Expand All @@ -572,11 +589,67 @@ export const layer = Layer.effect(
s.defs[name] = listed
if (instructions) s.instructions[name] = instructions
else delete s.instructions[name]
watch(s, name, client, bridge, timeout)
watch(s, name, client, bridge, mcp, generation)
if (previous) yield* Effect.tryPromise(() => previous.close()).pipe(Effect.ignore)
return s.status[name]
})

const reconnect = Effect.fnUntraced(function* (
s: State,
name: string,
mcp: ConfigMCPV1.Info & { type: "remote" },
generation: number,
) {
if (s.reconnecting[name] === generation) return
s.reconnecting[name] = generation

yield* reconnectAttempt(s, name, mcp, generation, 0).pipe(
Effect.flatMap((result) => {
if (!result?.mcpClient || !result.defs) return Effect.void
const client = result.mcpClient
if (!ownsGeneration(s, name, generation)) {
return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
}
return storeClient(s, name, client, result.defs, result.instructions, mcp, generation).pipe(
Effect.andThen(events.publish(ToolsChanged, { server: name })),
Effect.ignore,
)
}),
Effect.ensuring(
Effect.sync(() => {
if (s.reconnecting[name] === generation) delete s.reconnecting[name]
}),
),
)
})

function reconnectAttempt(
s: State,
name: string,
mcp: ConfigMCPV1.Info & { type: "remote" },
generation: number,
attempt: number,
): Effect.Effect<CreateResult | undefined> {
return Effect.gen(function* () {
if (!ownsGeneration(s, name, generation)) return undefined
const result = yield* create(name, mcp)
if (result.mcpClient) return result
if (result.status.status !== "failed" || attempt >= 4) return undefined
yield* Effect.sleep(Math.min(250 * 2 ** attempt, 2_000))
return yield* reconnectAttempt(s, name, mcp, generation, attempt + 1)
})
}

function nextGeneration(s: State, name: string) {
const generation = (s.generation[name] ?? 0) + 1
s.generation[name] = generation
return generation
}

function ownsGeneration(s: State, name: string, generation: number) {
return !s.disposed && s.generation[name] === generation
}

const status = Effect.fn("MCP.status")(function* () {
const s = yield* InstanceState.get(state)

Expand Down Expand Up @@ -615,16 +688,22 @@ export const layer = Layer.effect(

const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: ConfigMCPV1.Info) {
const s = yield* InstanceState.get(state)
const generation = nextGeneration(s, name)
const result = yield* create(name, mcp)

if (!ownsGeneration(s, name, generation)) {
const client = result.mcpClient
if (client) yield* Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
return s.status[name]
}
s.status[name] = result.status
if (!result.mcpClient) {
yield* closeClient(s, name)
delete s.clients[name]
return result.status
}

return yield* storeClient(s, name, result.mcpClient, result.defs!, result.instructions, mcp.timeout)
return yield* storeClient(s, name, result.mcpClient, result.defs!, result.instructions, mcp, generation)
})

const add = Effect.fn("MCP.add")(function* (name: string, mcp: ConfigMCPV1.Info) {
Expand All @@ -642,6 +721,7 @@ export const layer = Layer.effect(
const disconnect = Effect.fn("MCP.disconnect")(function* (name: string) {
yield* requireMcpConfig(name)
const s = yield* InstanceState.get(state)
nextGeneration(s, name)
yield* closeClient(s, name)
delete s.clients[name]
s.status[name] = { status: "disabled" }
Expand Down Expand Up @@ -878,7 +958,8 @@ export const layer = Layer.effect(

const s = yield* InstanceState.get(state)
yield* auth.clearOAuthState(mcpName)
return yield* storeClient(s, mcpName, client, listed, client.getInstructions()?.trim(), mcpConfig.timeout)
const generation = nextGeneration(s, mcpName)
return yield* storeClient(s, mcpName, client, listed, client.getInstructions()?.trim(), mcpConfig, generation)
}

const callbackPromise = McpOAuthCallback.waitForCallback(result.oauthState, mcpName)
Expand Down
120 changes: 120 additions & 0 deletions packages/opencode/test/fixture/mcp-reconnect-scenario.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import path from "node:path"
import { expect } from "bun:test"
import { Effect, Exit, Fiber } from "effect"
import { MCP } from "../../src/mcp/index"
import { testEffect } from "../lib/effect"

const it = testEffect(MCP.defaultLayer)

function server() {
return Effect.acquireRelease(
Effect.promise(
() =>
new Promise<{ child: ReturnType<typeof Bun.spawn>; url: string }>((resolve, reject) => {
const child = Bun.spawn([process.execPath, path.join(import.meta.dir, "mcp-reconnect-server.ts")], {
cwd: path.join(import.meta.dir, "../.."),
stdout: "inherit",
stderr: "inherit",
ipc(message) {
if (
typeof message === "object" &&
message !== null &&
"url" in message &&
typeof message.url === "string"
) {
resolve({ child, url: message.url })
}
},
})
child.exited.then((code) => reject(new Error(`MCP test server exited before readiness with code ${code}`)))
}),
),
({ child }) =>
Effect.promise(async () => {
child.kill()
await child.exited
}).pipe(Effect.ignore),
)
}

function control(url: string, action: "block" | "release" | "wait", kind: string, count: number) {
return Effect.tryPromise(() => fetch(`${url}control/${action}?kind=${kind}&count=${count}`, { method: "POST" })).pipe(
Effect.filterOrFail(
(response) => response.ok,
(response) => new Error(`control request failed: ${response.status}`),
),
)
}

function state(url: string) {
return Effect.promise(() => fetch(`${url}control/state`).then((response) => response.json())) as Effect.Effect<{
initialize: number
list: number
call: number
}>
}

it.instance(
"reconnects once without replaying an ambiguous tool call and publishes the replacement",
() =>
Effect.scoped(
Effect.gen(function* () {
const fixture = yield* server()
const mcp = yield* MCP.Service
yield* mcp.add("remote", { type: "remote", url: fixture.url, oauth: false })
yield* control(fixture.url, "block", "call", 1)

const execute = (yield* mcp.tools()).remote_probe?.execute
if (!execute) return yield* Effect.die("initial tool missing")
const call = yield* Effect.promise(() => execute({}, { toolCallId: "first", messages: [] })).pipe(
Effect.exit,
Effect.forkScoped,
)
yield* control(fixture.url, "wait", "call", 1)
const original = (yield* mcp.clients()).remote
const transport = original?.transport
if (!transport) return yield* Effect.die("initial client transport missing")
yield* Effect.promise(() => transport.close())
yield* control(fixture.url, "release", "call", 1)
const callExit = yield* Fiber.await(call)
expect(Exit.isSuccess(callExit) && Exit.isFailure(callExit.value)).toBe(true)

yield* control(fixture.url, "wait", "list", 2)
const replacement = (yield* mcp.clients()).remote
expect(replacement).toBeDefined()
expect(replacement).not.toBe(original)
const executeLater = (yield* mcp.tools()).remote_probe?.execute
if (!executeLater) return yield* Effect.die("replacement tool missing")
const result = yield* Effect.promise(() => executeLater({}, { toolCallId: "later", messages: [] }))
expect(result).toMatchObject({ content: [{ text: "call-2-initialize-2" }] })
expect(yield* state(fixture.url)).toEqual({ initialize: 2, list: 2, call: 2 })
}),
),
{ config: { mcp: {} } },
)

it.instance(
"disconnect fences a reconnect that finishes late",
() =>
Effect.scoped(
Effect.gen(function* () {
const fixture = yield* server()
const mcp = yield* MCP.Service
yield* mcp.add("remote", { type: "remote", url: fixture.url, oauth: false })
yield* control(fixture.url, "block", "initialize", 2)

const transport = (yield* mcp.clients()).remote?.transport
if (!transport) return yield* Effect.die("initial client transport missing")
yield* Effect.promise(() => transport.close())
yield* control(fixture.url, "wait", "initialize", 2)
yield* mcp.disconnect("remote")
yield* control(fixture.url, "release", "initialize", 2)
yield* control(fixture.url, "wait", "list", 2)

expect((yield* mcp.status()).remote).toEqual({ status: "disabled" })
expect((yield* mcp.clients()).remote).toBeUndefined()
expect(yield* state(fixture.url)).toEqual({ initialize: 2, list: 2, call: 0 })
}),
),
{ config: { mcp: {} } },
)
Loading
Loading