Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export class NotFoundError extends Schema.TaggedErrorClass<NotFoundError>()("Ses
export class OperationUnavailableError extends Schema.TaggedErrorClass<OperationUnavailableError>()(
"Session.OperationUnavailableError",
{
operation: Schema.Literals(["move", "shell", "skill", "switchAgent", "compact", "wait"]),
operation: Schema.Literals(["move", "shell", "skill", "switchAgent", "compact"]),
},
) {}

Expand Down Expand Up @@ -146,7 +146,7 @@ export interface Interface {
resume?: boolean
}) => Effect.Effect<void, OperationUnavailableError>
readonly compact: (input: CompactInput) => Effect.Effect<void, NotFoundError | OperationUnavailableError>
readonly wait: (id: SessionSchema.ID) => Effect.Effect<void, NotFoundError | OperationUnavailableError>
readonly wait: (id: SessionSchema.ID) => Effect.Effect<void, NotFoundError>
readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect<void, NotFoundError | SessionRunner.RunError>
readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect<void>
}
Expand Down Expand Up @@ -375,7 +375,7 @@ export const layer = Layer.effect(
}),
wait: Effect.fn("V2Session.wait")(function* (sessionID) {
yield* result.get(sessionID)
return yield* new OperationUnavailableError({ operation: "wait" })
yield* execution.wait(sessionID)
}),
resume: Effect.fn("V2Session.resume")(function* (sessionID) {
yield* result.get(sessionID)
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/session/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export interface Interface {
readonly resume: (sessionID: SessionSchema.ID) => Effect.Effect<void, SessionRunner.RunError>
/** Registers newly recorded work. Repeated wakeups may coalesce. */
readonly wake: (sessionID: SessionSchema.ID) => Effect.Effect<void>
/** Waits for active work owned by this process to finish. Idle wait is a no-op. */
readonly wait: (sessionID: SessionSchema.ID) => Effect.Effect<void>
/** Interrupt active work owned by this process. Idle interruption is a no-op. */
readonly interrupt: (sessionID: SessionSchema.ID) => Effect.Effect<void>
}
Expand All @@ -19,5 +21,5 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/v2
/** Low-level compatibility layer for callers that only need durable Session recording. */
export const noopLayer = Layer.succeed(
Service,
Service.of({ resume: () => Effect.void, wake: () => Effect.void, interrupt: () => Effect.void }),
Service.of({ resume: () => Effect.void, wake: () => Effect.void, wait: () => Effect.void, interrupt: () => Effect.void }),
)
1 change: 1 addition & 0 deletions packages/core/src/session/execution/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const layer = Layer.effect(
interrupt: coordinator.interrupt,
resume: coordinator.run,
wake: coordinator.wake,
wait: coordinator.wait,
})
}),
)
Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/session/run-coordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export interface Coordinator<Key, E> {
readonly run: (key: Key) => Effect.Effect<void, E>
/** Registers one coalesced follow-up after newly recorded work. */
readonly wake: (key: Key) => Effect.Effect<void>
/** Waits for active execution to finish without starting new work. */
readonly wait: (key: Key) => Effect.Effect<void>
/** Stops active execution and waits for its cleanup. */
readonly interrupt: (key: Key) => Effect.Effect<void>
}
Expand Down Expand Up @@ -89,6 +91,13 @@ export const make = <Key, E>(options: {
start(key, next, false)
})

const wait = (key: Key): Effect.Effect<void> =>
Effect.suspend(() => {
const entry = active.get(key)
if (entry === undefined) return Effect.void
return Effect.ignore(Deferred.await(entry.done)).pipe(Effect.andThen(wait(key)))
})

const interrupt = (key: Key): Effect.Effect<void> =>
Effect.suspend(() => {
const entry = active.get(key)
Expand All @@ -98,5 +107,5 @@ export const make = <Key, E>(options: {
return Fiber.interrupt(entry.owner)
})

return { run, wake, interrupt }
return { run, wake, wait, interrupt }
})
1 change: 1 addition & 0 deletions packages/core/test/session-prompt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const execution = Layer.succeed(
Effect.sync(() => {
interruptCalls.push(sessionID)
}),
wait: () => Effect.void,
wake: (sessionID) =>
Effect.sync(() => {
wakeCalls.push(sessionID)
Expand Down
40 changes: 40 additions & 0 deletions packages/core/test/session-run-coordinator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,46 @@ describe("SessionRunCoordinator", () => {
),
)

it.effect("waits for active execution without starting new work", () =>
Effect.scoped(
Effect.gen(function* () {
const started = yield* Deferred.make<void>()
const gate = yield* Deferred.make<void>()
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({
drain: () =>
Effect.sync(() => runs++).pipe(
Effect.andThen(Deferred.succeed(started, undefined)),
Effect.andThen(Deferred.await(gate)),
),
})

yield* coordinator.wake("session")
yield* Deferred.await(started)
const waiter = yield* coordinator.wait("session").pipe(Effect.forkChild)
yield* Effect.yieldNow

expect(runs).toBe(1)
yield* Deferred.succeed(gate, undefined)
yield* Fiber.join(waiter)
expect(runs).toBe(1)
}),
),
)

it.effect("does nothing when waiting while idle", () =>
Effect.scoped(
Effect.gen(function* () {
let runs = 0
const coordinator = yield* SessionRunCoordinator.make({ drain: () => Effect.sync(() => runs++) })

yield* coordinator.wait("session")

expect(runs).toBe(0)
}),
),
)

it.effect("coalesces wakes received during active execution", () =>
Effect.scoped(
Effect.gen(function* () {
Expand Down
1 change: 1 addition & 0 deletions packages/core/test/session-runner-recorded.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const execution = Layer.effect(
return SessionExecution.Service.of({
resume: coordinator.run,
wake: coordinator.wake,
wait: coordinator.wait,
interrupt: coordinator.interrupt,
})
}),
Expand Down
1 change: 1 addition & 0 deletions packages/core/test/session-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ const execution = Layer.effect(
return SessionExecution.Service.of({
resume: coordinator.run,
wake: coordinator.wake,
wait: coordinator.wait,
interrupt: coordinator.interrupt,
})
}),
Expand Down
5 changes: 1 addition & 4 deletions packages/opencode/test/server/httpapi-public-openapi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,7 @@ describe("PublicApi OpenAPI v2 errors", () => {
test("documents v2 unfinished session mutation errors", () => {
const spec = OpenApi.fromApi(PublicApi) as OpenApiSpec

for (const route of [
["post", "/api/session/{sessionID}/compact"],
["post", "/api/session/{sessionID}/wait"],
] as const) {
for (const route of [["post", "/api/session/{sessionID}/compact"]] as const) {
expect(componentName(responseRef(spec.paths[route[1]]?.[route[0]]?.responses?.["503"]) ?? "")).toBe(
"ServiceUnavailableError",
)
Expand Down
9 changes: 2 additions & 7 deletions packages/opencode/test/server/httpapi-session.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ describe("session HttpApi", () => {
)

it.instance(
"returns v2 public unavailable errors for unfinished session mutations",
"returns v2 public unavailable errors for unfinished session mutations and waits idle sessions",
() =>
Effect.gen(function* () {
const test = yield* TestInstance
Expand All @@ -645,12 +645,7 @@ describe("session HttpApi", () => {
})

const wait = yield* request(`/api/session/${session.id}/wait`, { method: "POST", headers })
expect(wait.status).toBe(503)
expect(yield* responseJson(wait)).toEqual({
_tag: "ServiceUnavailableError",
message: "Session wait is not available yet",
service: "session.wait",
})
expect(wait.status).toBe(204)
}),
{ git: true, config: { formatter: false, lsp: false } },
)
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/groups/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export const SessionGroup = HttpApiGroup.make("server.session")
HttpApiEndpoint.post("session.wait", "/api/session/:sessionID/wait", {
params: { sessionID: SessionV2.ID },
success: HttpApiSchema.NoContent,
error: [SessionNotFoundError, ServiceUnavailableError],
error: SessionNotFoundError,
})
.middleware(SessionLocationMiddleware)
.annotateMerge(
Expand Down
8 changes: 0 additions & 8 deletions packages/server/src/handlers/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,6 @@ export const SessionHandler = HttpApiBuilder.group(Api, "server.session", (handl
}),
),
),
Effect.catchTag("Session.OperationUnavailableError", (error) =>
Effect.fail(
new ServiceUnavailableError({
message: `Session ${error.operation} is not available yet`,
service: `session.${error.operation}`,
}),
),
),
)
return HttpApiSchema.NoContent.make()
}),
Expand Down
Loading