From 9603cca6690230b50d73e2c6b7eb27061dd35d7c Mon Sep 17 00:00:00 2001 From: xiaxinyu <76138768+Xiaxinyuuu@users.noreply.github.com> Date: Wed, 24 Jun 2026 03:32:08 +0000 Subject: [PATCH] fix: implement v2 session wait Co-authored-by: Starchild --- packages/core/src/session.ts | 6 +-- packages/core/src/session/execution.ts | 4 +- packages/core/src/session/execution/local.ts | 1 + packages/core/src/session/run-coordinator.ts | 11 ++++- packages/core/test/session-prompt.test.ts | 1 + .../core/test/session-run-coordinator.test.ts | 40 +++++++++++++++++++ .../core/test/session-runner-recorded.test.ts | 1 + packages/core/test/session-runner.test.ts | 1 + .../server/httpapi-public-openapi.test.ts | 5 +-- .../test/server/httpapi-session.test.ts | 9 +---- packages/server/src/groups/session.ts | 2 +- packages/server/src/handlers/session.ts | 8 ---- 12 files changed, 64 insertions(+), 25 deletions(-) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 888052411a87..2c15fccf242c 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -84,7 +84,7 @@ export class NotFoundError extends Schema.TaggedErrorClass()("Ses export class OperationUnavailableError extends Schema.TaggedErrorClass()( "Session.OperationUnavailableError", { - operation: Schema.Literals(["move", "shell", "skill", "switchAgent", "compact", "wait"]), + operation: Schema.Literals(["move", "shell", "skill", "switchAgent", "compact"]), }, ) {} @@ -146,7 +146,7 @@ export interface Interface { resume?: boolean }) => Effect.Effect readonly compact: (input: CompactInput) => Effect.Effect - readonly wait: (id: SessionSchema.ID) => Effect.Effect + readonly wait: (id: SessionSchema.ID) => Effect.Effect readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect } @@ -375,7 +375,7 @@ export const layer = Layer.effect( }), wait: Effect.fn("V2Session.wait")(function* (sessionID) { yield* result.get(sessionID) - return yield* new OperationUnavailableError({ operation: "wait" }) + yield* execution.wait(sessionID) }), resume: Effect.fn("V2Session.resume")(function* (sessionID) { yield* result.get(sessionID) diff --git a/packages/core/src/session/execution.ts b/packages/core/src/session/execution.ts index a08912e95653..0a7fd68c1196 100644 --- a/packages/core/src/session/execution.ts +++ b/packages/core/src/session/execution.ts @@ -9,6 +9,8 @@ export interface Interface { readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect /** Registers newly recorded work. Repeated wakeups may coalesce. */ readonly wake: (sessionID: SessionSchema.ID) => Effect.Effect + /** Waits for active work owned by this process to finish. Idle wait is a no-op. */ + readonly wait: (sessionID: SessionSchema.ID) => Effect.Effect /** Interrupt active work owned by this process. Idle interruption is a no-op. */ readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect } @@ -19,5 +21,5 @@ export class Service extends Context.Service()("@opencode/v2 /** Low-level compatibility layer for callers that only need durable Session recording. */ export const noopLayer = Layer.succeed( Service, - Service.of({ resume: () => Effect.void, wake: () => Effect.void, interrupt: () => Effect.void }), + Service.of({ resume: () => Effect.void, wake: () => Effect.void, wait: () => Effect.void, interrupt: () => Effect.void }), ) diff --git a/packages/core/src/session/execution/local.ts b/packages/core/src/session/execution/local.ts index 7e0e3ca70039..7fcfc563135a 100644 --- a/packages/core/src/session/execution/local.ts +++ b/packages/core/src/session/execution/local.ts @@ -31,6 +31,7 @@ export const layer = Layer.effect( interrupt: coordinator.interrupt, resume: coordinator.run, wake: coordinator.wake, + wait: coordinator.wait, }) }), ) diff --git a/packages/core/src/session/run-coordinator.ts b/packages/core/src/session/run-coordinator.ts index cfab42300ef8..212c27f09ec0 100644 --- a/packages/core/src/session/run-coordinator.ts +++ b/packages/core/src/session/run-coordinator.ts @@ -8,6 +8,8 @@ export interface Coordinator { readonly run: (key: Key) => Effect.Effect /** Registers one coalesced follow-up after newly recorded work. */ readonly wake: (key: Key) => Effect.Effect + /** Waits for active execution to finish without starting new work. */ + readonly wait: (key: Key) => Effect.Effect /** Stops active execution and waits for its cleanup. */ readonly interrupt: (key: Key) => Effect.Effect } @@ -89,6 +91,13 @@ export const make = (options: { start(key, next, false) }) + const wait = (key: Key): Effect.Effect => + Effect.suspend(() => { + const entry = active.get(key) + if (entry === undefined) return Effect.void + return Effect.ignore(Deferred.await(entry.done)).pipe(Effect.andThen(wait(key))) + }) + const interrupt = (key: Key): Effect.Effect => Effect.suspend(() => { const entry = active.get(key) @@ -98,5 +107,5 @@ export const make = (options: { return Fiber.interrupt(entry.owner) }) - return { run, wake, interrupt } + return { run, wake, wait, interrupt } }) diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index 1ab6c64c61f0..aef2ca5241d9 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -32,6 +32,7 @@ const execution = Layer.succeed( Effect.sync(() => { interruptCalls.push(sessionID) }), + wait: () => Effect.void, wake: (sessionID) => Effect.sync(() => { wakeCalls.push(sessionID) diff --git a/packages/core/test/session-run-coordinator.test.ts b/packages/core/test/session-run-coordinator.test.ts index 909ba648702d..1e8e2eab68de 100644 --- a/packages/core/test/session-run-coordinator.test.ts +++ b/packages/core/test/session-run-coordinator.test.ts @@ -66,6 +66,46 @@ describe("SessionRunCoordinator", () => { ), ) + it.effect("waits for active execution without starting new work", () => + Effect.scoped( + Effect.gen(function* () { + const started = yield* Deferred.make() + const gate = yield* Deferred.make() + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ + drain: () => + Effect.sync(() => runs++).pipe( + Effect.andThen(Deferred.succeed(started, undefined)), + Effect.andThen(Deferred.await(gate)), + ), + }) + + yield* coordinator.wake("session") + yield* Deferred.await(started) + const waiter = yield* coordinator.wait("session").pipe(Effect.forkChild) + yield* Effect.yieldNow + + expect(runs).toBe(1) + yield* Deferred.succeed(gate, undefined) + yield* Fiber.join(waiter) + expect(runs).toBe(1) + }), + ), + ) + + it.effect("does nothing when waiting while idle", () => + Effect.scoped( + Effect.gen(function* () { + let runs = 0 + const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.sync(() => runs++) }) + + yield* coordinator.wait("session") + + expect(runs).toBe(0) + }), + ), + ) + it.effect("coalesces wakes received during active execution", () => Effect.scoped( Effect.gen(function* () { diff --git a/packages/core/test/session-runner-recorded.test.ts b/packages/core/test/session-runner-recorded.test.ts index 926248c0b0a9..14a737fa48ac 100644 --- a/packages/core/test/session-runner-recorded.test.ts +++ b/packages/core/test/session-runner-recorded.test.ts @@ -94,6 +94,7 @@ const execution = Layer.effect( return SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake, + wait: coordinator.wait, interrupt: coordinator.interrupt, }) }), diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index afb4f9ad1f16..d7d887e5d2a3 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -253,6 +253,7 @@ const execution = Layer.effect( return SessionExecution.Service.of({ resume: coordinator.run, wake: coordinator.wake, + wait: coordinator.wait, interrupt: coordinator.interrupt, }) }), diff --git a/packages/opencode/test/server/httpapi-public-openapi.test.ts b/packages/opencode/test/server/httpapi-public-openapi.test.ts index a8f6f8d1c855..012bdea49e7d 100644 --- a/packages/opencode/test/server/httpapi-public-openapi.test.ts +++ b/packages/opencode/test/server/httpapi-public-openapi.test.ts @@ -220,10 +220,7 @@ describe("PublicApi OpenAPI v2 errors", () => { test("documents v2 unfinished session mutation errors", () => { const spec = OpenApi.fromApi(PublicApi) as OpenApiSpec - for (const route of [ - ["post", "/api/session/{sessionID}/compact"], - ["post", "/api/session/{sessionID}/wait"], - ] as const) { + for (const route of [["post", "/api/session/{sessionID}/compact"]] as const) { expect(componentName(responseRef(spec.paths[route[1]]?.[route[0]]?.responses?.["503"]) ?? "")).toBe( "ServiceUnavailableError", ) diff --git a/packages/opencode/test/server/httpapi-session.test.ts b/packages/opencode/test/server/httpapi-session.test.ts index ae72bc4d0acc..d41bdb9dd366 100644 --- a/packages/opencode/test/server/httpapi-session.test.ts +++ b/packages/opencode/test/server/httpapi-session.test.ts @@ -629,7 +629,7 @@ describe("session HttpApi", () => { ) it.instance( - "returns v2 public unavailable errors for unfinished session mutations", + "returns v2 public unavailable errors for unfinished session mutations and waits idle sessions", () => Effect.gen(function* () { const test = yield* TestInstance @@ -645,12 +645,7 @@ describe("session HttpApi", () => { }) const wait = yield* request(`/api/session/${session.id}/wait`, { method: "POST", headers }) - expect(wait.status).toBe(503) - expect(yield* responseJson(wait)).toEqual({ - _tag: "ServiceUnavailableError", - message: "Session wait is not available yet", - service: "session.wait", - }) + expect(wait.status).toBe(204) }), { git: true, config: { formatter: false, lsp: false } }, ) diff --git a/packages/server/src/groups/session.ts b/packages/server/src/groups/session.ts index 6a4f4aff609c..14443d962e01 100644 --- a/packages/server/src/groups/session.ts +++ b/packages/server/src/groups/session.ts @@ -212,7 +212,7 @@ export const SessionGroup = HttpApiGroup.make("server.session") HttpApiEndpoint.post("session.wait", "/api/session/:sessionID/wait", { params: { sessionID: SessionV2.ID }, success: HttpApiSchema.NoContent, - error: [SessionNotFoundError, ServiceUnavailableError], + error: SessionNotFoundError, }) .middleware(SessionLocationMiddleware) .annotateMerge( diff --git a/packages/server/src/handlers/session.ts b/packages/server/src/handlers/session.ts index 1fe860e5284e..e64814616e41 100644 --- a/packages/server/src/handlers/session.ts +++ b/packages/server/src/handlers/session.ts @@ -193,14 +193,6 @@ export const SessionHandler = HttpApiBuilder.group(Api, "server.session", (handl }), ), ), - Effect.catchTag("Session.OperationUnavailableError", (error) => - Effect.fail( - new ServiceUnavailableError({ - message: `Session ${error.operation} is not available yet`, - service: `session.${error.operation}`, - }), - ), - ), ) return HttpApiSchema.NoContent.make() }),