Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,6 @@ const table = sqliteTable("session", {
- Preserve one explicit `llm.stream(request)` call per provider turn and reload projected history before durable continuation. Do not bridge through legacy `SessionPrompt.loop(...)` or delegate orchestration to an in-memory tool loop.
- Keep local Session drains process-local until clustering is implemented. `SessionRunCoordinator` joins explicit same-Session resumes, coalesces prompt wakeups, and allows different Sessions to run concurrently. Advisory wakes drain eligible durable inbox rows only; post-crash continuation recovery requires a separate explicit design before it may retry provider work. A drain has no durable identity or transcript boundary.
- Keep delivery vocabulary explicit. Prompts steer by default and promote at the next safe provider-turn boundary while the current drain requires continuation. An explicit `queue` input remains pending until the Session would otherwise become idle; promote one queued input at that boundary, then reevaluate continuation before promoting another. Promoting any new user input resets the selected agent's provider-turn allowance; a batch of steers resets it once.
- A durably projected terminal provider failure ends the current drain without promoting pending steering or queued input. Preserve that inbox state for an explicit resume instead of silently advancing past the failure.
- Keep EventV2 replay owner claims separate from clustered Session execution ownership.
- Keep the System Context algebra, registry, and built-ins in `src/system-context`; keep Context Source producers with their observed domains, and keep Session History selection plus Context Epoch persistence Session-owned.
1 change: 1 addition & 0 deletions CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ The host-supplied environment overlay applied by the server when creating a PTY,
- **Prompt Promotion** atomically consumes the pending inbox entry and appends its model-visible user message.
- Steering prompts promote at the next **Safe Provider-Turn Boundary** while the current **Session Drain** still requires continuation. Promoting any newly admitted user input resets the selected agent's provider-turn allowance; multiple prompts promoted at one boundary reset it once.
- A queued prompt does not promote while the current **Session Drain** requires continuation. The runner promotes one queued prompt when the Session would otherwise become idle, then reevaluates continuation before promoting another.
- A durably projected terminal provider failure stops the current **Session Drain** without promoting pending input. Explicit resume may later promote that input from the durable inbox.
- A **Session Drain** is process-local coordination rather than a durable domain entity. Durable recovery must reason from prompts, projected history, provider attempts, and tool state rather than inventing an enclosing execution identity.
- The first provider turn renders the latest complete **Baseline System Context** and initializes its **Context Snapshot** without emitting a redundant **Mid-Conversation System Message**; unavailable initial context blocks the turn instead of persisting an incomplete baseline.
- Initial **System Context** preparation precedes the first durable input promotion so an unavailable baseline leaves that input pending and retryable; ordinary reconciliation remains after promotion.
Expand Down
13 changes: 10 additions & 3 deletions packages/core/src/session/runner/llm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,21 @@ export const layer = Layer.effect(
yield* withPublication(publisher.failUnsettledTools("Provider did not return a tool result", true))
if (stream._tag === "Failure") return yield* Effect.failCause(stream.cause)
if (settled._tag === "Failure") return yield* Effect.failCause(settled.cause)
return { needsContinuation: !publisher.hasProviderError() && needsContinuation, step: currentStep }
return {
outcome: publisher.hasProviderError() ? "failed" : needsContinuation ? "continue" : "complete",
step: currentStep,
} as const
}),
)
}, Effect.scoped)
type RunTurn = (
sessionID: SessionSchema.ID,
promotion: SessionInput.Delivery | undefined,
step: number,
) => Effect.Effect<{ readonly needsContinuation: boolean; readonly step: number }, RunError>
) => Effect.Effect<
{ readonly outcome: "continue" | "complete" | "failed"; readonly step: number },
RunError
>

const runAfterOverflowCompaction: RunTurn = Effect.fnUntraced(function* (sessionID, promotion, step) {
return yield* runTurnAttempt(sessionID, promotion, step).pipe(
Expand Down Expand Up @@ -361,7 +367,8 @@ export const layer = Layer.effect(
let step = 1
while (needsContinuation) {
const result = yield* runTurn(input.sessionID, promotion, step)
needsContinuation = result.needsContinuation
if (result.outcome === "failed") return
needsContinuation = result.outcome === "continue"
step = result.step + 1
promotion = "steer"
if (!needsContinuation) needsContinuation = yield* SessionInput.hasPending(db, input.sessionID, "steer")
Expand Down
37 changes: 37 additions & 0 deletions packages/core/test/session-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2927,6 +2927,43 @@ describe("SessionRunnerLLM", () => {
}),
)

it.effect("leaves queued input pending after a terminal provider error", () =>
Effect.gen(function* () {
yield* setup
const session = yield* SessionV2.Service
const { db } = yield* Database.Service
yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Fail first" }), resume: false })
yield* session.prompt({
sessionID,
prompt: new Prompt({ text: "Run after explicit resume" }),
delivery: "queue",
resume: false,
})

requests.length = 0
responses = [
[LLMEvent.stepStart({ index: 0 }), LLMEvent.providerError({ message: "Provider unavailable" })],
[
LLMEvent.stepStart({ index: 0 }),
LLMEvent.stepFinish({ index: 0, reason: "stop" }),
LLMEvent.finish({ reason: "stop" }),
],
]

yield* session.resume(sessionID)

expect(requests).toHaveLength(1)
expect(yield* SessionInput.hasPending(db, sessionID, "queue")).toBe(true)
expect(userTexts(requests[0]!)).toEqual(["Fail first"])

yield* session.resume(sessionID)

expect(requests).toHaveLength(2)
expect(yield* SessionInput.hasPending(db, sessionID, "queue")).toBe(false)
expect(userTexts(requests[1]!)).toEqual(["Fail first", "Run after explicit resume"])
}),
)

it.effect("projects provider errors emitted before assistant step start", () =>
Effect.gen(function* () {
yield* setup
Expand Down
2 changes: 2 additions & 0 deletions specs/v2/session.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ A process-global `SessionRunCoordinator` serializes execution for each local Ses

Inbox promotion coalesces pending steers in durable admission order. Once continuation would otherwise end, it promotes one queued input at a time in FIFO order. Add explicit inbox backlog and steering-batch limits before exposing broad multi-caller admission or untrusted queue growth.

A durably projected terminal provider failure stops the current drain without promoting pending steering or queued input. The failure remains visible in Session history, and explicit `run` may later promote the preserved inbox work. Raw stream failures, tool execution failures, and interruption already stop through their Effect failure or interruption paths.

Eager local-tool execution is intentionally unbounded in the current local slice. This minimizes tool latency but does not increase SQLite settlement throughput: Session-event publication remains serialized per provider turn. Before broadening exposure, revisit per-turn call limits, output truncation, and operational backpressure using observed workloads. The `session.next.*` event schemas remain experimental and unshipped; databases created by earlier experimental builds are disposable rather than compatibility targets.

The synchronized `session.next.*` event family and projected Session-message model predate this branch. This slice refines their replay contract: projected Session messages retain their source aggregate sequence so canonical context ordering and `sessions.messages(...)` pagination follow durable event order even when caller-supplied IDs or timestamps do not. Consumers can use `sessions.events({ sessionID, after? })` to replay durable `session.next.*` events after an aggregate sequence cursor, then tail durable events without a race. Live-only text, reasoning, and tool-input fragments remain available through EventV2 subscriptions for connected renderers; they are intentionally absent from the replayable Session stream.
Expand Down
Loading