Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 106 additions & 11 deletions packages/opencode/src/mcp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
ListRootsRequestSchema,
type LoggingMessageNotification,
LoggingMessageNotificationSchema,
ResourceListChangedNotificationSchema,
ResourceUpdatedNotificationSchema,
type Tool as MCPToolDef,
ToolListChangedNotificationSchema,
} from "@modelcontextprotocol/sdk/types.js"
Expand Down Expand Up @@ -60,7 +62,8 @@ export const Resource = Schema.Struct({
export type Resource = Schema.Schema.Type<typeof Resource>

export const ToolsChanged = McpEvent.ToolsChanged

export const ResourcesChanged = McpEvent.ResourcesChanged
export const ResourceUpdated = McpEvent.ResourceUpdated
export const BrowserOpenFailed = McpEvent.BrowserOpenFailed

export const Failed = NamedError.create("MCPFailed", {
Expand All @@ -72,6 +75,7 @@ export class NotFoundError extends Schema.TaggedErrorClass<NotFoundError>()("MCP
}) {}

type MCPClient = Client
type CompleteParams = Parameters<MCPClient["complete"]>[0]

function createClient(directory: string) {
const client = new Client({ name: "opencode", version: InstallationVersion }, CLIENT_OPTIONS)
Expand Down Expand Up @@ -146,6 +150,7 @@ interface State {
clients: Record<string, MCPClient>
defs: Record<string, MCPToolDef[]>
instructions: Record<string, string>
resourceSubscriptions: Record<string, Set<string>>
}

export interface ServerInstructions {
Expand Down Expand Up @@ -176,6 +181,14 @@ export interface Interface {
clientName: string,
resourceUri: string,
) => Effect.Effect<Awaited<ReturnType<MCPClient["readResource"]>> | undefined>
readonly complete: (
clientName: string,
ref: CompleteParams["ref"],
argument: CompleteParams["argument"],
context?: CompleteParams["context"],
) => Effect.Effect<Awaited<ReturnType<MCPClient["complete"]>> | undefined>
readonly subscribeResource: (clientName: string, resourceUri: string) => Effect.Effect<void>
readonly unsubscribeResource: (clientName: string, resourceUri: string) => Effect.Effect<void>
readonly startAuth: (
mcpName: string,
) => Effect.Effect<{ authorizationUrl: string; oauthState: string }, NotFoundError>
Expand Down Expand Up @@ -437,6 +450,7 @@ export const layer = Layer.effect(
delete s.clients[name]
delete s.defs[name]
delete s.instructions[name]
delete s.resourceSubscriptions[name]
s.status[name] = { status: "failed", error: "Connection closed" }
bridge.fork(
Effect.logWarning("MCP connection closed", { server: name }).pipe(
Expand All @@ -450,17 +464,33 @@ export const layer = Layer.effect(
bridge.promise(serverLog(name, notification.params)),
)

if (!client.getServerCapabilities()?.tools) return
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
const capabilities = client.getServerCapabilities()
if (capabilities?.resources) {
client.setNotificationHandler(ResourceListChangedNotificationSchema, async () => {
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
await bridge.promise(events.publish(ResourcesChanged, { server: name }).pipe(Effect.ignore))
})
if (capabilities.resources.subscribe) {
client.setNotificationHandler(ResourceUpdatedNotificationSchema, async (notification) => {
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
await bridge.promise(
events.publish(ResourceUpdated, { server: name, uri: notification.params.uri }).pipe(Effect.ignore),
)
})
}
}
if (capabilities?.tools) {
client.setNotificationHandler(ToolListChangedNotificationSchema, async () => {
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

const listed = await bridge.promise(McpCatalog.defs(client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
const listed = await bridge.promise(McpCatalog.defs(client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return

s.defs[name] = listed
await bridge.promise(events.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
})
s.defs[name] = listed
await bridge.promise(events.publish(ToolsChanged, { server: name }).pipe(Effect.ignore))
})
}
}

function serverLog(name: string, params: LoggingMessageNotification["params"]) {
Expand Down Expand Up @@ -492,6 +522,7 @@ export const layer = Layer.effect(
clients: {},
defs: {},
instructions: {},
resourceSubscriptions: {},
}

yield* Effect.forEach(
Expand All @@ -514,6 +545,7 @@ export const layer = Layer.effect(
s.clients[key] = result.mcpClient
s.defs[key] = result.defs!
if (result.instructions) s.instructions[key] = result.instructions
s.resourceSubscriptions[key] = new Set()
watch(s, key, result.mcpClient, bridge, mcp.timeout)
}
}),
Expand All @@ -526,6 +558,7 @@ export const layer = Layer.effect(
s.clients = {}
s.defs = {}
s.instructions = {}
s.resourceSubscriptions = {}
yield* Effect.forEach(
clients,
(client) =>
Expand Down Expand Up @@ -556,6 +589,7 @@ export const layer = Layer.effect(
delete s.clients[name]
delete s.defs[name]
delete s.instructions[name]
delete s.resourceSubscriptions[name]
if (!client) return Effect.void
return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
}
Expand All @@ -575,6 +609,7 @@ export const layer = Layer.effect(
s.defs[name] = listed
if (instructions) s.instructions[name] = instructions
else delete s.instructions[name]
s.resourceSubscriptions[name] = new Set()
watch(s, name, client, bridge, timeout)
if (previous) yield* Effect.tryPromise(() => previous.close()).pipe(Effect.ignore)
return s.status[name]
Expand Down Expand Up @@ -771,13 +806,70 @@ export const layer = Layer.effect(
)
})

const subscribeResource = Effect.fn("MCP.subscribeResource")(function* (clientName: string, resourceUri: string) {
const s = yield* InstanceState.get(state)
if (s.resourceSubscriptions[clientName]?.has(resourceUri)) return

const result = yield* withClient(
clientName,
async (client, timeout) => {
if (!client.getServerCapabilities()?.resources?.subscribe) return false
await client.subscribeResource({ uri: resourceUri }, { timeout })
return true
},
"subscribeResource",
{ resourceUri },
)
if (result) (s.resourceSubscriptions[clientName] ??= new Set()).add(resourceUri)
})

const readResource = Effect.fn("MCP.readResource")(function* (clientName: string, resourceUri: string) {
return yield* withClient(
const result = yield* withClient(
clientName,
(client, timeout) => client.readResource({ uri: resourceUri }, { timeout }),
"readResource",
{ resourceUri },
)
if (result) yield* subscribeResource(clientName, resourceUri).pipe(Effect.ignore)
return result
})

const complete = Effect.fn("MCP.complete")(function* (
clientName: string,
ref: CompleteParams["ref"],
argument: CompleteParams["argument"],
context?: CompleteParams["context"],
) {
return yield* withClient(
clientName,
(client, timeout) => {
if (!client.getServerCapabilities()?.completions) return Promise.resolve(undefined)
return client.complete(context === undefined ? { ref, argument } : { ref, argument, context }, { timeout })
},
"complete",
{ ref, argumentName: argument.name },
)
})

const unsubscribeResource = Effect.fn("MCP.unsubscribeResource")(function* (
clientName: string,
resourceUri: string,
) {
const s = yield* InstanceState.get(state)
const subscribed = s.resourceSubscriptions[clientName]
if (!subscribed?.has(resourceUri)) return

const result = yield* withClient(
clientName,
async (client, timeout) => {
if (!client.getServerCapabilities()?.resources?.subscribe) return false
await client.unsubscribeResource({ uri: resourceUri }, { timeout })
return true
},
"unsubscribeResource",
{ resourceUri },
)
if (result) subscribed.delete(resourceUri)
})

const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) {
Expand Down Expand Up @@ -982,6 +1074,9 @@ export const layer = Layer.effect(
disconnect,
getPrompt,
readResource,
complete,
subscribeResource,
unsubscribeResource,
startAuth,
authenticate,
finishAuth,
Expand Down
Loading
Loading