From 63b7829639a8acd4460eb65330a75b50cb900cbf Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 22 Jun 2026 16:33:12 -0400 Subject: [PATCH 1/3] refactor(core): simplify session input admission --- packages/core/src/database/migration.gen.ts | 1 + .../20260622202450_simplify_session_input.ts | 17 + packages/core/src/session.ts | 11 +- packages/core/src/session/context-epoch.ts | 15 +- packages/core/src/session/event.ts | 28 -- packages/core/src/session/input.ts | 190 ++++---- packages/core/src/session/message-updater.ts | 2 - packages/core/src/session/projector.ts | 41 +- packages/core/src/session/runner/llm.ts | 2 +- packages/core/test/database-migration.test.ts | 6 +- packages/core/test/session-create.test.ts | 26 +- packages/core/test/session-projector.test.ts | 11 +- packages/core/test/session-prompt.test.ts | 41 +- .../core/test/session-runner-recorded.test.ts | 3 +- packages/core/test/session-runner.test.ts | 6 +- packages/sdk/js/src/v2/gen/types.gen.ts | 133 +----- packages/sdk/openapi.json | 423 +----------------- packages/tui/src/context/data.tsx | 14 - packages/tui/test/cli/tui/data.test.tsx | 67 +-- specs/v2/schema-changelog.md | 7 + specs/v2/session.md | 8 +- 21 files changed, 177 insertions(+), 875 deletions(-) create mode 100644 packages/core/src/database/migration/20260622202450_simplify_session_input.ts diff --git a/packages/core/src/database/migration.gen.ts b/packages/core/src/database/migration.gen.ts index 19b1b5684325..e6ea4eaa1477 100644 --- a/packages/core/src/database/migration.gen.ts +++ b/packages/core/src/database/migration.gen.ts @@ -39,5 +39,6 @@ export const migrations = ( import("./migration/20260612174303_project_dir_strategy"), import("./migration/20260622142730_simplify_session_context_epoch"), import("./migration/20260622170816_reset_v2_session_state"), + import("./migration/20260622202450_simplify_session_input"), ]) ).map((module) => module.default) satisfies DatabaseMigration.Migration[] diff --git a/packages/core/src/database/migration/20260622202450_simplify_session_input.ts b/packages/core/src/database/migration/20260622202450_simplify_session_input.ts new file mode 100644 index 000000000000..0b5ddd1bfde3 --- /dev/null +++ b/packages/core/src/database/migration/20260622202450_simplify_session_input.ts @@ -0,0 +1,17 @@ +import { Effect } from "effect" +import type { DatabaseMigration } from "../migration" + +export default { + id: "20260622202450_simplify_session_input", + up(tx) { + return Effect.gen(function* () { + yield* tx.run(`DELETE FROM \`session_context_epoch\`;`) + yield* tx.run(`DELETE FROM \`session_input\`;`) + yield* tx.run(`DELETE FROM \`session_message\`;`) + yield* tx.run(`DELETE FROM \`event\`;`) + yield* tx.run(`DELETE FROM \`event_sequence\`;`) + yield* tx.run(`UPDATE \`session\` SET \`workspace_id\` = NULL WHERE \`workspace_id\` IS NOT NULL;`) + yield* tx.run(`DELETE FROM \`workspace\`;`) + }) + }, +} satisfies DatabaseMigration.Migration diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 5dae699733f8..4340f3f68644 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -330,18 +330,13 @@ export const layer = Layer.effect( const messageID = input.id ?? SessionMessage.ID.create() const delivery = input.delivery ?? "steer" const expected = { sessionID: input.sessionID, messageID, prompt: input.prompt, delivery } - const admitted = yield* SessionInput.admit(db, events, { + const admitted = yield* SessionInput.admit(db, { id: messageID, sessionID: input.sessionID, prompt: input.prompt, delivery, - }).pipe( - Effect.catchDefect((defect) => - defect instanceof SessionInput.LifecycleConflict - ? new PromptConflictError({ sessionID: input.sessionID, messageID }) - : Effect.die(defect), - ), - ) + }) + if (admitted === undefined) return yield* new PromptConflictError({ sessionID: input.sessionID, messageID }) if (!SessionInput.equivalent(admitted, expected)) return yield* new PromptConflictError({ sessionID: input.sessionID, messageID }) if (input.resume !== false) yield* execution.wake(admitted.sessionID) diff --git a/packages/core/src/session/context-epoch.ts b/packages/core/src/session/context-epoch.ts index 18624706a973..cf1c436b29f4 100644 --- a/packages/core/src/session/context-epoch.ts +++ b/packages/core/src/session/context-epoch.ts @@ -4,6 +4,7 @@ import { eq } from "drizzle-orm" import { DateTime, Effect, Schema } from "effect" import type { Database } from "../database/database" import { EventV2 } from "../event" +import { EventSequenceTable } from "../event/sql" import { SystemContext } from "../system-context/index" import { ContextSnapshotDecodeError } from "./error" import { SessionEvent } from "./event" @@ -15,6 +16,16 @@ import { SessionContextEpochTable } from "./sql" type DatabaseService = Database.Interface["db"] +const latestEventSeq = Effect.fnUntraced(function* (db: DatabaseService, sessionID: SessionSchema.ID) { + const row = yield* db + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, sessionID)) + .get() + .pipe(Effect.orDie) + return row?.seq ?? -1 +}) + interface Prepared { readonly baseline: string readonly baselineSeq: number @@ -64,7 +75,7 @@ const prepareOnce = Effect.fnUntraced(function* ( return { baseline: stored.baseline, baselineSeq: stored.baseline_seq } } if (result._tag === "ReplacementReady") { - const baselineSeq = replacementSeq ?? (yield* SessionInput.latestSeq(db, sessionID)) + const baselineSeq = replacementSeq ?? (yield* latestEventSeq(db, sessionID)) yield* replace(db, sessionID, baselineSeq, result.generation) return { baseline: result.generation.baseline, baselineSeq } } @@ -124,7 +135,7 @@ const insert = Effect.fnUntraced(function* ( sessionID: SessionSchema.ID, generation: SystemContext.Generation, ) { - const baselineSeq = yield* SessionInput.latestSeq(db, sessionID) + const baselineSeq = yield* latestEventSeq(db, sessionID) yield* db .insert(SessionContextEpochTable) .values({ diff --git a/packages/core/src/session/event.ts b/packages/core/src/session/event.ts index d4b773d18bf7..32f4d29445ca 100644 --- a/packages/core/src/session/event.ts +++ b/packages/core/src/session/event.ts @@ -92,32 +92,6 @@ export const Prompted = EventV2.define({ }) export type Prompted = typeof Prompted.Type -export namespace PromptLifecycle { - export const Admitted = EventV2.define({ - type: "session.next.prompt.admitted", - ...options, - schema: { - ...Base, - messageID: SessionMessageID.ID, - prompt: Prompt, - delivery: Schema.Literals(["steer", "queue"]), - }, - }) - export type Admitted = typeof Admitted.Type - - export const Promoted = EventV2.define({ - type: "session.next.prompt.promoted", - ...options, - schema: { - ...Base, - messageID: SessionMessageID.ID, - prompt: Prompt, - timeCreated: V2Schema.DateTimeUtcFromMillis, - }, - }) - export type Promoted = typeof Promoted.Type -} - export const ContextUpdated = EventV2.define({ type: "session.next.context.updated", ...options, @@ -455,8 +429,6 @@ const DurableDefinitions = [ ModelSwitched, Moved, Prompted, - PromptLifecycle.Admitted, - PromptLifecycle.Promoted, ContextUpdated, Synthetic, Shell.Started, diff --git a/packages/core/src/session/input.ts b/packages/core/src/session/input.ts index f8bc2b0e6bb0..df6080e5544d 100644 --- a/packages/core/src/session/input.ts +++ b/packages/core/src/session/input.ts @@ -1,10 +1,9 @@ export * as SessionInput from "./input" -import { and, asc, eq, isNull, lte } from "drizzle-orm" +import { and, asc, eq, isNull, lte, max } from "drizzle-orm" import { DateTime, Effect, Schema } from "effect" import type { Database } from "../database/database" import type { EventV2 } from "../event" -import { EventSequenceTable } from "../event/sql" import { NonNegativeInt } from "../schema" import { V2Schema } from "../v2-schema" import { SessionEvent } from "./event" @@ -19,7 +18,7 @@ export const Delivery = Schema.Literals(["steer", "queue"]) export type Delivery = typeof Delivery.Type export class Admitted extends Schema.Class("SessionInput.Admitted")({ - admittedSeq: NonNegativeInt, + admittedSeq: NonNegativeInt.annotate({ description: "Session-local inbox order; not an Event cursor" }), id: SessionMessage.ID, sessionID: SessionSchema.ID, prompt: Prompt, @@ -53,7 +52,6 @@ export class LifecycleConflict extends Schema.TaggedErrorClass - event.durable === undefined - ? Effect.die("Prompt admission event is missing aggregate sequence") - : Effect.succeed( - new Admitted({ - admittedSeq: event.durable.seq, - id: input.id, - sessionID: input.sessionID, - prompt: input.prompt, - delivery: input.delivery, - timeCreated: timestamp, - }), - ), - ), - Effect.catchDefect((defect) => - find(db, input.id).pipe(Effect.flatMap((stored) => (stored ? Effect.succeed(stored) : Effect.die(defect)))), - ), + return yield* db + .transaction( + (tx) => + Effect.gen(function* () { + const existing = yield* tx + .select() + .from(SessionInputTable) + .where(eq(SessionInputTable.id, input.id)) + .get() + .pipe(Effect.orDie) + if (existing !== undefined) return fromRow(existing) + const message = yield* tx + .select({ id: SessionMessageTable.id }) + .from(SessionMessageTable) + .where(eq(SessionMessageTable.id, input.id)) + .get() + .pipe(Effect.orDie) + if (message !== undefined) return undefined + const latest = yield* tx + .select({ seq: max(SessionInputTable.admitted_seq) }) + .from(SessionInputTable) + .where(eq(SessionInputTable.session_id, input.sessionID)) + .get() + .pipe(Effect.orDie) + const row = yield* tx + .insert(SessionInputTable) + .values({ + id: input.id, + session_id: input.sessionID, + prompt: encodePrompt(input.prompt), + delivery: input.delivery, + admitted_seq: (latest?.seq ?? -1) + 1, + }) + .returning() + .get() + .pipe(Effect.orDie) + return fromRow(row) + }), + { behavior: "immediate" }, ) + .pipe(Effect.orDie) }) -export const latestSeq = Effect.fn("SessionInput.latestSeq")(function* ( +export const latestAdmittedSeq = Effect.fn("SessionInput.latestAdmittedSeq")(function* ( db: DatabaseService, sessionID: SessionSchema.ID, ) { const row = yield* db - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, sessionID)) + .select({ seq: max(SessionInputTable.admitted_seq) }) + .from(SessionInputTable) + .where(eq(SessionInputTable.session_id, sessionID)) .get() .pipe(Effect.orDie) return row?.seq ?? -1 }) -export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(function* ( +export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(function* ( db: DatabaseService, input: { - readonly admittedSeq: number readonly id: SessionMessage.ID readonly sessionID: SessionSchema.ID readonly prompt: Prompt readonly delivery: Delivery readonly timeCreated: DateTime.Utc - }, -) { - const stored = yield* db - .insert(SessionInputTable) - .values({ - id: input.id, - session_id: input.sessionID, - admitted_seq: input.admittedSeq, - prompt: encodePrompt(input.prompt), - delivery: input.delivery, - time_created: DateTime.toEpochMillis(input.timeCreated), - }) - .onConflictDoNothing() - .returning({ id: SessionInputTable.id }) - .get() - .pipe(Effect.orDie) - if (!stored) return yield* Effect.die(new LifecycleConflict({ id: input.id })) -}) - -export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(function* ( - db: DatabaseService, - input: { - readonly id: SessionMessage.ID - readonly sessionID: SessionSchema.ID - readonly prompt: Prompt - readonly timeCreated: DateTime.Utc readonly promotedSeq: number }, ) { @@ -157,14 +139,35 @@ export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(functio .returning() .get() .pipe(Effect.orDie) - if (!updated) return yield* Effect.die(new LifecycleConflict({ id: input.id })) - const stored = fromRow(updated) - if ( - !matchesPrompt(stored, input) || - DateTime.toEpochMillis(stored.timeCreated) !== DateTime.toEpochMillis(input.timeCreated) - ) - return yield* Effect.die(new LifecycleConflict({ id: input.id })) - return toMessage(stored) + if (updated) { + const stored = fromRow(updated) + if (!matchesProjection(stored, input)) return yield* Effect.die(new LifecycleConflict({ id: input.id })) + return toMessage(stored) + } + + const stored = yield* find(db, input.id) + if (stored) { + if (!matchesProjection(stored, input) || stored.promotedSeq !== input.promotedSeq) + return yield* Effect.die(new LifecycleConflict({ id: input.id })) + return toMessage(stored) + } + + const admittedSeq = (yield* latestAdmittedSeq(db, input.sessionID)) + 1 + const inserted = yield* db + .insert(SessionInputTable) + .values({ + id: input.id, + session_id: input.sessionID, + prompt: encodePrompt(input.prompt), + delivery: input.delivery, + admitted_seq: admittedSeq, + promoted_seq: input.promotedSeq, + time_created: DateTime.toEpochMillis(input.timeCreated), + }) + .returning() + .get() + .pipe(Effect.orDie) + return toMessage(fromRow(inserted)) }) export const hasPending = Effect.fn("SessionInput.hasPending")(function* ( @@ -201,35 +204,17 @@ const matchesPrompt = (input: Admitted, expected: { readonly sessionID: SessionS input.sessionID === expected.sessionID && JSON.stringify(encodePrompt(input.prompt)) === JSON.stringify(encodePrompt(expected.prompt)) -export const projectLegacyPrompted = Effect.fn("SessionInput.projectLegacyPrompted")(function* ( - db: DatabaseService, - input: { - readonly id: SessionMessage.ID +const matchesProjection = ( + input: Admitted, + expected: { readonly sessionID: SessionSchema.ID readonly prompt: Prompt readonly delivery: Delivery readonly timeCreated: DateTime.Utc - readonly promotedSeq: number }, -) { - const inserted = yield* db - .insert(SessionInputTable) - .values({ - id: input.id, - session_id: input.sessionID, - admitted_seq: input.promotedSeq, - prompt: encodePrompt(input.prompt), - delivery: input.delivery, - promoted_seq: input.promotedSeq, - time_created: DateTime.toEpochMillis(input.timeCreated), - }) - .onConflictDoNothing() - .returning() - .get() - .pipe(Effect.orDie) - if (!inserted) return yield* Effect.die("Prompt projection conflicts with admitted input") - return fromRow(inserted) -}) +) => + equivalent(input, expected) && + DateTime.toEpochMillis(input.timeCreated) === DateTime.toEpochMillis(expected.timeCreated) const publish = Effect.fn("SessionInput.publish")(function* ( db: DatabaseService, @@ -238,18 +223,19 @@ const publish = Effect.fn("SessionInput.publish")(function* ( rows: ReadonlyArray, ) { for (const row of rows) { + const id = SessionMessage.ID.make(row.id) yield* events - .publish(SessionEvent.PromptLifecycle.Promoted, { + .publish(SessionEvent.Prompted, { sessionID, - timestamp: yield* DateTime.now, - messageID: SessionMessage.ID.make(row.id), + timestamp: DateTime.makeUnsafe(row.time_created), + messageID: id, prompt: decodePrompt(row.prompt), - timeCreated: DateTime.makeUnsafe(row.time_created), + delivery: row.delivery, }) .pipe( Effect.catchDefect((defect) => defect instanceof LifecycleConflict - ? find(db, SessionMessage.ID.make(row.id)).pipe( + ? find(db, id).pipe( Effect.flatMap((stored) => (stored?.promotedSeq === undefined ? Effect.die(defect) : Effect.void)), ) : Effect.die(defect), diff --git a/packages/core/src/session/message-updater.ts b/packages/core/src/session/message-updater.ts index 2c836dcd0173..ca76ae436d74 100644 --- a/packages/core/src/session/message-updater.ts +++ b/packages/core/src/session/message-updater.ts @@ -136,8 +136,6 @@ export function update(adapter: Adapter, event: SessionEvent.Event) { }), ) }, - "session.next.prompt.admitted": () => Effect.void, - "session.next.prompt.promoted": () => Effect.void, "session.next.context.updated": (event) => adapter.appendMessage( new SessionMessage.System({ diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index 2c87dfb8dffb..8fed07a3da05 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -21,7 +21,6 @@ type DatabaseService = Database.Interface["db"] const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message) const encodeMessage = Schema.encodeSync(SessionMessage.Message) -class PromptAlreadyProjected extends Error {} export class SessionAlreadyProjected extends Error {} type Usage = { @@ -349,51 +348,17 @@ export const layer = Layer.effectDiscard( }), ) yield* events.project(SessionEvent.Prompted, (event) => - Effect.gen(function* () { - const messageID = event.data.messageID - const existing = yield* db - .select({ id: SessionMessageTable.id }) - .from(SessionMessageTable) - .where(eq(SessionMessageTable.id, messageID)) - .get() - .pipe(Effect.orDie) - if (existing) return yield* Effect.die(new PromptAlreadyProjected()) - yield* run(db, event) - if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") - yield* SessionInput.projectLegacyPrompted(db, { - id: messageID, - sessionID: event.data.sessionID, - prompt: event.data.prompt, - delivery: event.data.delivery, - timeCreated: event.data.timestamp, - promotedSeq: event.durable.seq, - }) - }), - ) - yield* events.project(SessionEvent.PromptLifecycle.Admitted, (event) => - Effect.gen(function* () { - if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") - yield* SessionInput.projectAdmitted(db, { - admittedSeq: event.durable.seq, - id: event.data.messageID, - sessionID: event.data.sessionID, - prompt: event.data.prompt, - delivery: event.data.delivery, - timeCreated: event.data.timestamp, - }) - }), - ) - yield* events.project(SessionEvent.PromptLifecycle.Promoted, (event) => Effect.gen(function* () { if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") yield* insertMessage( db, event, - yield* SessionInput.projectPromoted(db, { + yield* SessionInput.projectPrompted(db, { id: event.data.messageID, sessionID: event.data.sessionID, prompt: event.data.prompt, - timeCreated: event.data.timeCreated, + delivery: event.data.delivery, + timeCreated: event.data.timestamp, promotedSeq: event.durable.seq, }), ) diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index ddd2bf4e152a..4e3da7209e4e 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -176,7 +176,7 @@ export const layer = Layer.effect( const toolFibers = yield* FiberSet.make() let needsContinuation = false if (promotion) { - const cutoff = yield* SessionInput.latestSeq(db, session.id) + const cutoff = yield* SessionInput.latestAdmittedSeq(db, session.id) if (promotion === "steer") yield* SessionInput.promoteSteers(db, events, session.id, cutoff) if (promotion === "queue") { yield* SessionInput.promoteNextQueued(db, events, session.id) diff --git a/packages/core/test/database-migration.test.ts b/packages/core/test/database-migration.test.ts index 835f41931d60..63768719355c 100644 --- a/packages/core/test/database-migration.test.ts +++ b/packages/core/test/database-migration.test.ts @@ -14,7 +14,7 @@ import sessionMessageProjectionOrderMigration from "@opencode-ai/core/database/m import eventSourcedSessionInputMigration from "@opencode-ai/core/database/migration/20260604172448_event_sourced_session_input" import contextEpochAgentMigration from "@opencode-ai/core/database/migration/20260605042240_add_context_epoch_agent" import simplifyIntegrationCredentialsMigration from "@opencode-ai/core/database/migration/20260611192811_lush_chimera" -import resetV2SessionStateMigration from "@opencode-ai/core/database/migration/20260622170816_reset_v2_session_state" +import simplifySessionInputMigration from "@opencode-ai/core/database/migration/20260622202450_simplify_session_input" import { EventV2 } from "@opencode-ai/core/event" import { ProjectV2 } from "@opencode-ai/core/project" import { ProjectTable } from "@opencode-ai/core/project/sql" @@ -264,8 +264,8 @@ describe("DatabaseMigration", () => { yield* db.run( sql`INSERT INTO session_context_epoch (session_id, baseline, snapshot, baseline_seq) VALUES ('session', 'baseline', '{}', 9)`, ) - yield* db.run(sql`DELETE FROM migration WHERE id = ${resetV2SessionStateMigration.id}`) - yield* DatabaseMigration.applyOnly(db, [resetV2SessionStateMigration]) + yield* db.run(sql`DELETE FROM migration WHERE id = ${simplifySessionInputMigration.id}`) + yield* DatabaseMigration.applyOnly(db, [simplifySessionInputMigration]) const database = Layer.succeed(Database.Service, { db }) const events = EventV2.layer.pipe(Layer.provide(database)) diff --git a/packages/core/test/session-create.test.ts b/packages/core/test/session-create.test.ts index 6fd80c60da1d..d9ddb6d36392 100644 --- a/packages/core/test/session-create.test.ts +++ b/packages/core/test/session-create.test.ts @@ -222,11 +222,8 @@ describe("SessionV2.create", () => { yield* SessionInput.promoteSteers(db, events, created.id, Number.MAX_SAFE_INTEGER) expect( - Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(2), Stream.runCollect)), - ).toMatchObject([ - { durable: { seq: 1 }, type: "session.next.prompt.admitted", data: { prompt: { text: "Hello" } } }, - { durable: { seq: 2 }, type: "session.next.prompt.promoted" }, - ]) + Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(1), Stream.runCollect)), + ).toMatchObject([{ durable: { seq: 1 }, type: "session.next.prompted", data: { prompt: { text: "Hello" } } }]) }), ) @@ -276,24 +273,18 @@ describe("SessionV2.create", () => { .pipe(Effect.orDie) expect(yield* store.get(created.id)).toBeUndefined() - expect(yield* events.replayAll(serialized.slice(0, 2))).toBe(created.id) - expect(yield* SessionInput.find(db, admitted.id)).toMatchObject({ - id: admitted.id, - sessionID: created.id, - prompt: { text: "Replay lifecycle" }, - delivery: "steer", - admittedSeq: 1, - }) + expect(yield* events.replayAll(serialized.slice(0, 1))).toBe(created.id) + expect(yield* SessionInput.find(db, admitted.id)).toBeUndefined() expect(yield* store.context(created.id)).toEqual([]) - expect(yield* events.replayAll(serialized.slice(2))).toBe(created.id) + expect(yield* events.replayAll(serialized.slice(1))).toBe(created.id) expect(yield* SessionInput.find(db, admitted.id)).toMatchObject({ id: admitted.id, sessionID: created.id, prompt: { text: "Replay lifecycle" }, delivery: "steer", - admittedSeq: 1, - promotedSeq: 2, + admittedSeq: 0, + promotedSeq: 1, }) expect(yield* store.context(created.id)).toMatchObject([ { id: admitted.id, type: "user", text: "Replay lifecycle" }, @@ -308,8 +299,7 @@ describe("SessionV2.create", () => { .pipe(Effect.orDie)).map((event) => [event.seq, event.type]), ).toEqual([ [0, EventV2.versionedType(SessionV1.Event.Created.type, 1)], - [1, EventV2.versionedType(SessionEvent.PromptLifecycle.Admitted.type, 1)], - [2, EventV2.versionedType(SessionEvent.PromptLifecycle.Promoted.type, 1)], + [1, EventV2.versionedType(SessionEvent.Prompted.type, 1)], ]) }).pipe(Effect.provide(Layer.fresh(Layer.mergeAll(targetDatabase, targetEvents, targetProjector, targetStore)))) }), diff --git a/packages/core/test/session-projector.test.ts b/packages/core/test/session-projector.test.ts index a0894d07eb0f..c80cf54922d0 100644 --- a/packages/core/test/session-projector.test.ts +++ b/packages/core/test/session-projector.test.ts @@ -120,7 +120,7 @@ describe("SessionProjector", () => { ), ) - it.effect("marks an admitted lifecycle row promoted with the PromptPromoted event sequence", () => + it.effect("marks an inbox row promoted with the Prompted event sequence", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db @@ -142,19 +142,20 @@ describe("SessionProjector", () => { .pipe(Effect.orDie) const events = yield* EventV2.Service const id = SessionMessage.ID.make("msg_admitted") - yield* SessionInput.admit(db, events, { + const admitted = yield* SessionInput.admit(db, { id, sessionID, prompt: new Prompt({ text: "promote me" }), delivery: "steer", }) + if (!admitted) return yield* Effect.die("Prompt admission failed") - const event = yield* events.publish(SessionEvent.PromptLifecycle.Promoted, { + const event = yield* events.publish(SessionEvent.Prompted, { sessionID, - timestamp: created, + timestamp: admitted.timeCreated, messageID: id, prompt: new Prompt({ text: "promote me" }), - timeCreated: created, + delivery: "steer", }) expect( diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index 166b5deed12c..37167735046d 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -168,7 +168,7 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service - const fiber = yield* session.events({ sessionID }).pipe(Stream.take(4), Stream.runCollect, Effect.forkScoped) + const fiber = yield* session.events({ sessionID }).pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) @@ -177,10 +177,8 @@ describe("SessionV2.prompt", () => { const streamed = Array.from(yield* Fiber.join(fiber)) expect(streamed.map((event) => [event.durable?.seq, event.type])).toEqual([ - [0, "session.next.prompt.admitted"], - [1, "session.next.prompt.admitted"], - [2, "session.next.prompt.promoted"], - [3, "session.next.prompt.promoted"], + [0, "session.next.prompted"], + [1, "session.next.prompted"], ]) expect( Array.from( @@ -188,7 +186,7 @@ describe("SessionV2.prompt", () => { .events({ sessionID, after: streamed[0]!.durable?.seq }) .pipe(Stream.take(1), Stream.runCollect), ).map((event) => [event.durable?.seq, event.type]), - ).toEqual([[1, "session.next.prompt.admitted"]]) + ).toEqual([[1, "session.next.prompted"]]) }), ) @@ -334,7 +332,7 @@ describe("SessionV2.prompt", () => { expect(messages[1]).toEqual(messages[0]) expect(yield* session.messages({ sessionID })).toEqual([]) expect(yield* admittedCount).toBe(1) - expect(yield* eventCount(EventV2.versionedType(SessionEvent.PromptLifecycle.Admitted.type, 1))).toBe(1) + expect(yield* eventCount(EventV2.versionedType(SessionEvent.Prompted.type, 1))).toBe(0) }), ) @@ -354,22 +352,22 @@ describe("SessionV2.prompt", () => { { concurrency: "unbounded" }, ) - expect(yield* eventCount(EventV2.versionedType(SessionEvent.PromptLifecycle.Promoted.type, 1))).toBe(1) - expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 1 }) + expect(yield* eventCount(EventV2.versionedType(SessionEvent.Prompted.type, 1))).toBe(1) + expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 0 }) expect(yield* session.messages({ sessionID })).toMatchObject([ { id: messageID, type: "user", text: "Promote once" }, ]) }), ) - it.effect("promotes steers only through the captured aggregate cutoff", () => + it.effect("promotes steers only through the captured inbox cutoff", () => Effect.gen(function* () { yield* setup const { db } = yield* Database.Service const session = yield* SessionV2.Service const events = yield* EventV2.Service const first = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Before cutoff" }), resume: false }) - const cutoff = yield* SessionInput.latestSeq(db, sessionID) + const cutoff = yield* SessionInput.latestAdmittedSeq(db, sessionID) const second = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "After cutoff" }), resume: false }) yield* SessionInput.promoteSteers(db, events, sessionID, cutoff) @@ -379,12 +377,11 @@ describe("SessionV2.prompt", () => { }), ) - it.effect("reprojects one pending lifecycle without scheduling execution", () => + it.effect("keeps pending inbox input outside the replayable event stream", () => Effect.gen(function* () { yield* setup const { db } = yield* Database.Service const session = yield* SessionV2.Service - const events = yield* EventV2.Service wakeCalls.length = 0 yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "Replay pending" }), resume: false }) const recorded = yield* db @@ -394,23 +391,7 @@ describe("SessionV2.prompt", () => { .all() .pipe(Effect.orDie) - yield* events.remove(sessionID) - yield* db.delete(SessionInputTable).where(eq(SessionInputTable.session_id, sessionID)).run().pipe(Effect.orDie) - yield* db - .delete(SessionMessageTable) - .where(eq(SessionMessageTable.session_id, sessionID)) - .run() - .pipe(Effect.orDie) - yield* events.replayAll( - recorded.map((event) => ({ - id: event.id, - aggregateID: event.aggregate_id, - seq: event.seq, - type: event.type, - data: event.data, - })), - ) - + expect(recorded).toEqual([]) expect(yield* admitted(messageID)).toMatchObject({ id: messageID, prompt: { text: "Replay pending" } }) expect(yield* session.messages({ sessionID })).toEqual([]) expect(wakeCalls).toEqual([]) diff --git a/packages/core/test/session-runner-recorded.test.ts b/packages/core/test/session-runner-recorded.test.ts index 65e90cb6d093..b84930e27f1c 100644 --- a/packages/core/test/session-runner-recorded.test.ts +++ b/packages/core/test/session-runner-recorded.test.ts @@ -175,8 +175,7 @@ describe("SessionRunnerLLM recorded", () => { .orderBy(EventTable.seq) .all()).map((event) => event.type), ).toEqual([ - "session.next.prompt.admitted.1", - "session.next.prompt.promoted.1", + "session.next.prompted.1", "session.next.step.started.1", "session.next.text.started.1", "session.next.text.ended.1", diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 6e97ab7939fd..ec932c904bc1 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -2404,7 +2404,7 @@ describe("SessionRunnerLLM", () => { const events = yield* EventV2.Service const defect = new Error("fail after prompt promotion") let fail = true - yield* events.project(SessionEvent.PromptLifecycle.Promoted, () => (fail ? Effect.die(defect) : Effect.void)) + yield* events.project(SessionEvent.Prompted, () => (fail ? Effect.die(defect) : Effect.void)) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover promoted input" }), resume: false }) expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(defect) @@ -2429,9 +2429,7 @@ describe("SessionRunnerLLM", () => { const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* events.listen((event) => - event.type === SessionEvent.PromptLifecycle.Promoted.type - ? Effect.die("fail after prompt promotion commits") - : Effect.void, + event.type === SessionEvent.Prompted.type ? Effect.die("fail after prompt promotion commits") : Effect.void, ) yield* session.prompt({ sessionID, diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index b15ff9347095..d6e7d488e1d1 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -19,8 +19,6 @@ export type Event = | EventSessionNextModelSwitched | EventSessionNextMoved | EventSessionNextPrompted - | EventSessionNextPromptAdmitted - | EventSessionNextPromptPromoted | EventSessionNextContextUpdated | EventSessionNextSynthetic | EventSessionNextShellStarted @@ -853,28 +851,6 @@ export type GlobalEvent = { delivery: "steer" | "queue" } } - | { - id: string - type: "session.next.prompt.admitted" - properties: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - delivery: "steer" | "queue" - } - } - | { - id: string - type: "session.next.prompt.promoted" - properties: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } - } | { id: string type: "session.next.context.updated" @@ -1627,8 +1603,6 @@ export type GlobalEvent = { | SyncEventSessionNextModelSwitched | SyncEventSessionNextMoved | SyncEventSessionNextPrompted - | SyncEventSessionNextPromptAdmitted - | SyncEventSessionNextPromptPromoted | SyncEventSessionNextContextUpdated | SyncEventSessionNextSynthetic | SyncEventSessionNextShellStarted @@ -2769,8 +2743,6 @@ export type V2Event = | V2EventSessionNextModelSwitched | V2EventSessionNextMoved | V2EventSessionNextPrompted - | V2EventSessionNextPromptAdmitted - | V2EventSessionNextPromptPromoted | V2EventSessionNextContextUpdated | V2EventSessionNextSynthetic | V2EventSessionNextShellStarted @@ -3202,42 +3174,6 @@ export type SyncEventSessionNextPrompted = { } } -export type SyncEventSessionNextPromptAdmitted = { - type: "sync" - id: string - syncEvent: { - type: "session.next.prompt.admitted.1" - id: string - seq: number - aggregateID: string - data: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - delivery: "steer" | "queue" - } - } -} - -export type SyncEventSessionNextPromptPromoted = { - type: "sync" - id: string - syncEvent: { - type: "session.next.prompt.promoted.1" - id: string - seq: number - aggregateID: string - data: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } - } -} - export type SyncEventSessionNextContextUpdated = { type: "sync" id: string @@ -3752,6 +3688,9 @@ export type SessionV2Info = { } export type SessionInputAdmitted = { + /** + * Session-local inbox order; not an Event cursor + */ admittedSeq: number id: string sessionID: string @@ -4502,48 +4441,6 @@ export type V2EventSessionNextPrompted = { } } -export type V2EventSessionNextPromptAdmitted = { - id: string - metadata?: { - [key: string]: unknown - } - durable?: { - aggregateID: string - seq: number - version: number - } - location?: LocationRef - type: "session.next.prompt.admitted" - data: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - delivery: "steer" | "queue" - } -} - -export type V2EventSessionNextPromptPromoted = { - id: string - metadata?: { - [key: string]: unknown - } - durable?: { - aggregateID: string - seq: number - version: number - } - location?: LocationRef - type: "session.next.prompt.promoted" - data: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } -} - export type V2EventSessionNextContextUpdated = { id: string metadata?: { @@ -6156,30 +6053,6 @@ export type EventSessionNextPrompted = { } } -export type EventSessionNextPromptAdmitted = { - id: string - type: "session.next.prompt.admitted" - properties: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - delivery: "steer" | "queue" - } -} - -export type EventSessionNextPromptPromoted = { - id: string - type: "session.next.prompt.promoted" - properties: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } -} - export type EventSessionNextContextUpdated = { id: string type: "session.next.context.updated" diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 0d8dc1aecfe2..48ba095575ba 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -14683,12 +14683,6 @@ { "$ref": "#/components/schemas/EventSessionNextPrompted" }, - { - "$ref": "#/components/schemas/EventSessionNextPromptAdmitted" - }, - { - "$ref": "#/components/schemas/EventSessionNextPromptPromoted" - }, { "$ref": "#/components/schemas/EventSessionNextContextUpdated" }, @@ -17215,85 +17209,6 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, - { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.admitted"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "delivery": { - "type": "string", - "enum": ["steer", "queue"] - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, - { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, { "type": "object", "properties": { @@ -19829,12 +19744,6 @@ { "$ref": "#/components/schemas/SyncEventSessionNextPrompted" }, - { - "$ref": "#/components/schemas/SyncEventSessionNextPromptAdmitted" - }, - { - "$ref": "#/components/schemas/SyncEventSessionNextPromptPromoted" - }, { "$ref": "#/components/schemas/SyncEventSessionNextContextUpdated" }, @@ -23070,12 +22979,6 @@ { "$ref": "#/components/schemas/V2EventSessionNextPrompted" }, - { - "$ref": "#/components/schemas/V2EventSessionNextPromptAdmitted" - }, - { - "$ref": "#/components/schemas/V2EventSessionNextPromptPromoted" - }, { "$ref": "#/components/schemas/V2EventSessionNextContextUpdated" }, @@ -24339,127 +24242,6 @@ "required": ["type", "id", "syncEvent"], "additionalProperties": false }, - "SyncEventSessionNextPromptAdmitted": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["sync"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "syncEvent": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["session.next.prompt.admitted.1"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "seq": { - "type": "number" - }, - "aggregateID": { - "type": "string" - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "delivery": { - "type": "string", - "enum": ["steer", "queue"] - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], - "additionalProperties": false - } - }, - "required": ["type", "id", "seq", "aggregateID", "data"], - "additionalProperties": false - } - }, - "required": ["type", "id", "syncEvent"], - "additionalProperties": false - }, - "SyncEventSessionNextPromptPromoted": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["sync"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "syncEvent": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted.1"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "seq": { - "type": "number" - }, - "aggregateID": { - "type": "string" - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["type", "id", "seq", "aggregateID", "data"], - "additionalProperties": false - } - }, - "required": ["type", "id", "syncEvent"], - "additionalProperties": false - }, "SyncEventSessionNextContextUpdated": { "type": "object", "properties": { @@ -26090,7 +25872,8 @@ "properties": { "admittedSeq": { "type": "integer", - "minimum": 0 + "minimum": 0, + "description": "Session-local inbox order; not an Event cursor" }, "id": { "type": "string", @@ -28560,129 +28343,6 @@ "required": ["id", "type", "data"], "additionalProperties": false }, - "V2EventSessionNextPromptAdmitted": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "metadata": { - "type": "object" - }, - "durable": { - "type": "object", - "properties": { - "aggregateID": { - "type": "string" - }, - "seq": { - "type": "integer" - }, - "version": { - "type": "integer" - } - }, - "required": ["aggregateID", "seq", "version"], - "additionalProperties": false - }, - "location": { - "$ref": "#/components/schemas/LocationRef" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.admitted"] - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "delivery": { - "type": "string", - "enum": ["steer", "queue"] - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], - "additionalProperties": false - } - }, - "required": ["id", "type", "data"], - "additionalProperties": false - }, - "V2EventSessionNextPromptPromoted": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "metadata": { - "type": "object" - }, - "durable": { - "type": "object", - "properties": { - "aggregateID": { - "type": "string" - }, - "seq": { - "type": "integer" - }, - "version": { - "type": "integer" - } - }, - "required": ["aggregateID", "seq", "version"], - "additionalProperties": false - }, - "location": { - "$ref": "#/components/schemas/LocationRef" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted"] - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["id", "type", "data"], - "additionalProperties": false - }, "V2EventSessionNextContextUpdated": { "type": "object", "properties": { @@ -33242,85 +32902,6 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, - "EventSessionNextPromptAdmitted": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.admitted"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "delivery": { - "type": "string", - "enum": ["steer", "queue"] - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, - "EventSessionNextPromptPromoted": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, "EventSessionNextContextUpdated": { "type": "object", "properties": { diff --git a/packages/tui/src/context/data.tsx b/packages/tui/src/context/data.tsx index 05cf4afbebc6..b94661e1737c 100644 --- a/packages/tui/src/context/data.tsx +++ b/packages/tui/src/context/data.tsx @@ -162,20 +162,6 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ }) break } - case "session.next.prompt.admitted": - break - case "session.next.prompt.promoted": - message.update(event.data.sessionID, (draft) => { - message.prepend(draft, { - id: event.data.messageID, - type: "user", - text: event.data.prompt.text, - files: event.data.prompt.files, - agents: event.data.prompt.agents, - time: { created: event.data.timeCreated }, - }) - }) - break case "session.next.context.updated": message.update(event.data.sessionID, (draft) => { message.prepend(draft, { diff --git a/packages/tui/test/cli/tui/data.test.tsx b/packages/tui/test/cli/tui/data.test.tsx index 0d6ada4d161c..bfc209c164ed 100644 --- a/packages/tui/test/cli/tui/data.test.tsx +++ b/packages/tui/test/cli/tui/data.test.tsx @@ -370,7 +370,7 @@ test("settles pending tools when a live failure arrives", async () => { } }) -test("renders admitted prompts only after promotion", async () => { +test("renders a prompted user message", async () => { const events = createEventSource() const calls = createFetch(undefined, events) let sync!: ReturnType @@ -400,8 +400,8 @@ test("renders admitted prompts only after promotion", async () => { try { await mounted emitEvent(events, { - id: "evt_admitted_1", - type: "session.next.prompt.admitted", + id: "evt_prompted_1", + type: "session.next.prompted", properties: { sessionID: "session-1", messageID: "msg_user_1", @@ -410,19 +410,6 @@ test("renders admitted prompts only after promotion", async () => { delivery: "steer", }, }) - expect(sync.session.message.list("session-1") ?? []).toEqual([]) - - emitEvent(events, { - id: "evt_promoted_1", - type: "session.next.prompt.promoted", - properties: { - sessionID: "session-1", - messageID: "msg_user_1", - timestamp: 1, - prompt: { text: "hello" }, - timeCreated: 0, - }, - }) await wait(() => sync.session.message.list("session-1")?.length === 1) const message = sync.session.message.list("session-1")?.[0] @@ -434,54 +421,6 @@ test("renders admitted prompts only after promotion", async () => { } }) -test("renders a promoted prompt when admission was missed", async () => { - const events = createEventSource() - const calls = createFetch(undefined, events) - let sync!: ReturnType - let ready!: () => void - const mounted = new Promise((resolve) => { - ready = resolve - }) - - function Probe() { - sync = useData() - onMount(ready) - return - } - - const app = await testRender(() => ( - - - - - - - - - - )) - - try { - await mounted - emitEvent(events, { - id: "evt_promoted_1", - type: "session.next.prompt.promoted", - properties: { - sessionID: "session-1", - messageID: "msg_user_1", - timestamp: 1, - prompt: { text: "hello" }, - timeCreated: 0, - }, - }) - - await wait(() => sync.session.message.list("session-1")?.length === 1) - expect(sync.session.message.list("session-1")?.[0]?.id).toBe("msg_user_1") - } finally { - app.renderer.destroy() - } -}) - test("projects live context updates with their message ID", async () => { const events = createEventSource() const calls = createFetch(undefined, events) diff --git a/specs/v2/schema-changelog.md b/specs/v2/schema-changelog.md index 6d9c0efddbc8..5238655f6159 100644 --- a/specs/v2/schema-changelog.md +++ b/specs/v2/schema-changelog.md @@ -1,5 +1,12 @@ # V2 Schema Changelog +## 2026-06-22: Simplify Session Input Admission + +- Admit prompts directly into the durable `session_input` inbox without adding invisible work to the Session event stream. +- Replace `session.next.prompt.admitted.1` and `session.next.prompt.promoted.1` with the existing `session.next.prompted.1` event when input becomes model-visible. +- Preserve the prompt endpoint, admission receipt, idempotency, steer/queue ordering, and atomic user-message projection. +- Reset experimental V2 events, projections, inputs, Context Epochs, and synchronized workspace state while preserving canonical V1 `session`, `message`, and `part` rows. + ## 2026-06-22: Reset Unpublished Compaction Event - Replace the unpublished `session.next.compaction.ended.1` payload with the current checkpoint payload and remove its legacy decoder. diff --git a/specs/v2/session.md b/specs/v2/session.md index 9946758322ea..9c709e432308 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -12,8 +12,8 @@ sessions.create({ id?, location, ... }) sessions.prompt({ id?, sessionID, prompt, delivery?, resume? }) -> omitted ID generates one internal message ID - -> supplied ID admits one durable Session input when absent - -> exact reuse returns the same admitted lifecycle receipt + -> supplied ID inserts one durable Session inbox row when absent + -> exact reuse returns the same admission receipt -> reusing one message ID for another Session, prompt, or delivery mode fails -> exact retry schedules another wake unless resume is false -> resume omitted or true schedules execution after admission @@ -27,7 +27,9 @@ sessions.interrupt(sessionID) -> idle or missing Session is a no-op ``` -`session_input` is the durable admission inbox. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `PromptLifecycle.Promoted`. The projector atomically writes the visible user message and marks its inbox row promoted in the same event transaction. The legacy V1-to-V2 shadow bridge continues publishing ordinary `Prompted` events for already-visible V1 prompts. +`session_input` is the durable admission inbox. Admission is a direct idempotent database transaction and does not create a transcript event. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `Prompted`. Its projector atomically writes the visible user message and marks the inbox row promoted in the same event transaction. The V1-to-V2 shadow bridge publishes the same `Prompted` event for already-visible V1 prompts. + +`admittedSeq` orders inputs within one Session inbox. It is not an EventV2 cursor and may be synthesized locally when replay reconstructs a visible prompt. Execution routing starts from only the Session ID: From dc1aae035859d4297c3c7835a9c82b3f65f5f281 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 22 Jun 2026 17:38:55 -0400 Subject: [PATCH 2/3] refactor(core): preserve prompt admission events --- packages/core/src/event.ts | 13 ++ packages/core/src/session.ts | 11 +- packages/core/src/session/context-epoch.ts | 15 +- packages/core/src/session/event.ts | 21 +- packages/core/src/session/input.ts | 113 ++++----- packages/core/src/session/message-updater.ts | 1 + packages/core/src/session/projector.ts | 13 ++ packages/core/src/session/runner/llm.ts | 2 +- packages/core/test/session-create.test.ts | 26 ++- packages/core/test/session-projector.test.ts | 2 +- packages/core/test/session-prompt.test.ts | 58 ++++- .../core/test/session-runner-recorded.test.ts | 1 + packages/sdk/js/src/v2/gen/types.gen.ts | 68 +++++- packages/sdk/openapi.json | 215 +++++++++++++++++- packages/tui/src/context/data.tsx | 2 + packages/tui/test/cli/tui/data.test.tsx | 15 +- specs/v2/schema-changelog.md | 6 +- specs/v2/session.md | 4 +- 18 files changed, 483 insertions(+), 103 deletions(-) diff --git a/packages/core/src/event.ts b/packages/core/src/event.ts index 32aaeae6995c..3439a0aefc85 100644 --- a/packages/core/src/event.ts +++ b/packages/core/src/event.ts @@ -46,6 +46,19 @@ export type Payload = { export type Subscriber = (event: Payload) => Effect.Effect export type Unsubscribe = Effect.Effect +export const latestSequence = Effect.fn("EventV2.latestSequence")(function* ( + db: Database.Interface["db"], + aggregateID: string, +) { + const row = yield* db + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, aggregateID)) + .get() + .pipe(Effect.orDie) + return row?.seq ?? -1 +}) + export type SerializedEvent = { readonly id: ID readonly type: string diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 4340f3f68644..5dae699733f8 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -330,13 +330,18 @@ export const layer = Layer.effect( const messageID = input.id ?? SessionMessage.ID.create() const delivery = input.delivery ?? "steer" const expected = { sessionID: input.sessionID, messageID, prompt: input.prompt, delivery } - const admitted = yield* SessionInput.admit(db, { + const admitted = yield* SessionInput.admit(db, events, { id: messageID, sessionID: input.sessionID, prompt: input.prompt, delivery, - }) - if (admitted === undefined) return yield* new PromptConflictError({ sessionID: input.sessionID, messageID }) + }).pipe( + Effect.catchDefect((defect) => + defect instanceof SessionInput.LifecycleConflict + ? new PromptConflictError({ sessionID: input.sessionID, messageID }) + : Effect.die(defect), + ), + ) if (!SessionInput.equivalent(admitted, expected)) return yield* new PromptConflictError({ sessionID: input.sessionID, messageID }) if (input.resume !== false) yield* execution.wake(admitted.sessionID) diff --git a/packages/core/src/session/context-epoch.ts b/packages/core/src/session/context-epoch.ts index cf1c436b29f4..f06b69cd5151 100644 --- a/packages/core/src/session/context-epoch.ts +++ b/packages/core/src/session/context-epoch.ts @@ -4,7 +4,6 @@ import { eq } from "drizzle-orm" import { DateTime, Effect, Schema } from "effect" import type { Database } from "../database/database" import { EventV2 } from "../event" -import { EventSequenceTable } from "../event/sql" import { SystemContext } from "../system-context/index" import { ContextSnapshotDecodeError } from "./error" import { SessionEvent } from "./event" @@ -16,16 +15,6 @@ import { SessionContextEpochTable } from "./sql" type DatabaseService = Database.Interface["db"] -const latestEventSeq = Effect.fnUntraced(function* (db: DatabaseService, sessionID: SessionSchema.ID) { - const row = yield* db - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, sessionID)) - .get() - .pipe(Effect.orDie) - return row?.seq ?? -1 -}) - interface Prepared { readonly baseline: string readonly baselineSeq: number @@ -75,7 +64,7 @@ const prepareOnce = Effect.fnUntraced(function* ( return { baseline: stored.baseline, baselineSeq: stored.baseline_seq } } if (result._tag === "ReplacementReady") { - const baselineSeq = replacementSeq ?? (yield* latestEventSeq(db, sessionID)) + const baselineSeq = replacementSeq ?? (yield* EventV2.latestSequence(db, sessionID)) yield* replace(db, sessionID, baselineSeq, result.generation) return { baseline: result.generation.baseline, baselineSeq } } @@ -135,7 +124,7 @@ const insert = Effect.fnUntraced(function* ( sessionID: SessionSchema.ID, generation: SystemContext.Generation, ) { - const baselineSeq = yield* latestEventSeq(db, sessionID) + const baselineSeq = yield* EventV2.latestSequence(db, sessionID) yield* db .insert(SessionContextEpochTable) .values({ diff --git a/packages/core/src/session/event.ts b/packages/core/src/session/event.ts index 32f4d29445ca..88ac4aa2679b 100644 --- a/packages/core/src/session/event.ts +++ b/packages/core/src/session/event.ts @@ -25,6 +25,12 @@ const Base = { timestamp: V2Schema.DateTimeUtcFromMillis, sessionID: SessionSchema.ID, } +const PromptFields = { + ...Base, + messageID: SessionMessageID.ID, + prompt: Prompt, + delivery: Schema.Literals(["steer", "queue"]), +} const options = { durable: { @@ -83,15 +89,17 @@ export type Moved = typeof Moved.Type export const Prompted = EventV2.define({ type: "session.next.prompted", ...options, - schema: { - ...Base, - messageID: SessionMessageID.ID, - prompt: Prompt, - delivery: Schema.Literals(["steer", "queue"]), - }, + schema: PromptFields, }) export type Prompted = typeof Prompted.Type +export const PromptAdmitted = EventV2.define({ + type: "session.next.prompt.admitted", + ...options, + schema: PromptFields, +}) +export type PromptAdmitted = typeof PromptAdmitted.Type + export const ContextUpdated = EventV2.define({ type: "session.next.context.updated", ...options, @@ -429,6 +437,7 @@ const DurableDefinitions = [ ModelSwitched, Moved, Prompted, + PromptAdmitted, ContextUpdated, Synthetic, Shell.Started, diff --git a/packages/core/src/session/input.ts b/packages/core/src/session/input.ts index df6080e5544d..068523aa4938 100644 --- a/packages/core/src/session/input.ts +++ b/packages/core/src/session/input.ts @@ -1,6 +1,6 @@ export * as SessionInput from "./input" -import { and, asc, eq, isNull, lte, max } from "drizzle-orm" +import { and, asc, eq, isNull, lte } from "drizzle-orm" import { DateTime, Effect, Schema } from "effect" import type { Database } from "../database/database" import type { EventV2 } from "../event" @@ -18,7 +18,7 @@ export const Delivery = Schema.Literals(["steer", "queue"]) export type Delivery = typeof Delivery.Type export class Admitted extends Schema.Class("SessionInput.Admitted")({ - admittedSeq: NonNegativeInt.annotate({ description: "Session-local inbox order; not an Event cursor" }), + admittedSeq: NonNegativeInt, id: SessionMessage.ID, sessionID: SessionSchema.ID, prompt: Prompt, @@ -52,6 +52,7 @@ export class LifecycleConflict extends Schema.TaggedErrorClass - Effect.gen(function* () { - const existing = yield* tx - .select() - .from(SessionInputTable) - .where(eq(SessionInputTable.id, input.id)) - .get() - .pipe(Effect.orDie) - if (existing !== undefined) return fromRow(existing) - const message = yield* tx - .select({ id: SessionMessageTable.id }) - .from(SessionMessageTable) - .where(eq(SessionMessageTable.id, input.id)) - .get() - .pipe(Effect.orDie) - if (message !== undefined) return undefined - const latest = yield* tx - .select({ seq: max(SessionInputTable.admitted_seq) }) - .from(SessionInputTable) - .where(eq(SessionInputTable.session_id, input.sessionID)) - .get() - .pipe(Effect.orDie) - const row = yield* tx - .insert(SessionInputTable) - .values({ - id: input.id, - session_id: input.sessionID, - prompt: encodePrompt(input.prompt), - delivery: input.delivery, - admitted_seq: (latest?.seq ?? -1) + 1, - }) - .returning() - .get() - .pipe(Effect.orDie) - return fromRow(row) - }), - { behavior: "immediate" }, + const existing = yield* find(db, input.id) + if (existing !== undefined) return existing + const timestamp = yield* DateTime.now + return yield* events + .publish(SessionEvent.PromptAdmitted, { + messageID: input.id, + sessionID: input.sessionID, + timestamp, + prompt: input.prompt, + delivery: input.delivery, + }) + .pipe( + Effect.flatMap((event) => + event.durable === undefined + ? Effect.die("Prompt admission event is missing aggregate sequence") + : Effect.succeed( + new Admitted({ + admittedSeq: event.durable.seq, + id: input.id, + sessionID: input.sessionID, + prompt: input.prompt, + delivery: input.delivery, + timeCreated: timestamp, + }), + ), + ), + Effect.catchDefect((defect) => + find(db, input.id).pipe(Effect.flatMap((stored) => (stored ? Effect.succeed(stored) : Effect.die(defect)))), + ), ) - .pipe(Effect.orDie) }) -export const latestAdmittedSeq = Effect.fn("SessionInput.latestAdmittedSeq")(function* ( +export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(function* ( db: DatabaseService, - sessionID: SessionSchema.ID, + input: { + readonly admittedSeq: number + readonly id: SessionMessage.ID + readonly sessionID: SessionSchema.ID + readonly prompt: Prompt + readonly delivery: Delivery + readonly timeCreated: DateTime.Utc + }, ) { - const row = yield* db - .select({ seq: max(SessionInputTable.admitted_seq) }) - .from(SessionInputTable) - .where(eq(SessionInputTable.session_id, sessionID)) + const message = yield* db + .select({ id: SessionMessageTable.id }) + .from(SessionMessageTable) + .where(eq(SessionMessageTable.id, input.id)) + .get() + .pipe(Effect.orDie) + if (message !== undefined) return yield* Effect.die(new LifecycleConflict({ id: input.id })) + const stored = yield* db + .insert(SessionInputTable) + .values({ + id: input.id, + session_id: input.sessionID, + admitted_seq: input.admittedSeq, + prompt: encodePrompt(input.prompt), + delivery: input.delivery, + time_created: DateTime.toEpochMillis(input.timeCreated), + }) + .onConflictDoNothing() + .returning({ id: SessionInputTable.id }) .get() .pipe(Effect.orDie) - return row?.seq ?? -1 + if (!stored) return yield* Effect.die(new LifecycleConflict({ id: input.id })) }) export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(function* ( @@ -152,7 +164,6 @@ export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(functio return toMessage(stored) } - const admittedSeq = (yield* latestAdmittedSeq(db, input.sessionID)) + 1 const inserted = yield* db .insert(SessionInputTable) .values({ @@ -160,7 +171,7 @@ export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(functio session_id: input.sessionID, prompt: encodePrompt(input.prompt), delivery: input.delivery, - admitted_seq: admittedSeq, + admitted_seq: input.promotedSeq, promoted_seq: input.promotedSeq, time_created: DateTime.toEpochMillis(input.timeCreated), }) diff --git a/packages/core/src/session/message-updater.ts b/packages/core/src/session/message-updater.ts index ca76ae436d74..4ece3c2d1954 100644 --- a/packages/core/src/session/message-updater.ts +++ b/packages/core/src/session/message-updater.ts @@ -136,6 +136,7 @@ export function update(adapter: Adapter, event: SessionEvent.Event) { }), ) }, + "session.next.prompt.admitted": () => Effect.void, "session.next.context.updated": (event) => adapter.appendMessage( new SessionMessage.System({ diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index 8fed07a3da05..1ab32fe31061 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -364,6 +364,19 @@ export const layer = Layer.effectDiscard( ) }), ) + yield* events.project(SessionEvent.PromptAdmitted, (event) => + Effect.gen(function* () { + if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") + yield* SessionInput.projectAdmitted(db, { + admittedSeq: event.durable.seq, + id: event.data.messageID, + sessionID: event.data.sessionID, + prompt: event.data.prompt, + delivery: event.data.delivery, + timeCreated: event.data.timestamp, + }) + }), + ) yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event)) yield* events.project(SessionEvent.Synthetic, (event) => run(db, event)) yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event)) diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index 4e3da7209e4e..9caae3849472 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -176,7 +176,7 @@ export const layer = Layer.effect( const toolFibers = yield* FiberSet.make() let needsContinuation = false if (promotion) { - const cutoff = yield* SessionInput.latestAdmittedSeq(db, session.id) + const cutoff = yield* EventV2.latestSequence(db, session.id) if (promotion === "steer") yield* SessionInput.promoteSteers(db, events, session.id, cutoff) if (promotion === "queue") { yield* SessionInput.promoteNextQueued(db, events, session.id) diff --git a/packages/core/test/session-create.test.ts b/packages/core/test/session-create.test.ts index d9ddb6d36392..1ae6c011fc4a 100644 --- a/packages/core/test/session-create.test.ts +++ b/packages/core/test/session-create.test.ts @@ -222,8 +222,11 @@ describe("SessionV2.create", () => { yield* SessionInput.promoteSteers(db, events, created.id, Number.MAX_SAFE_INTEGER) expect( - Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(1), Stream.runCollect)), - ).toMatchObject([{ durable: { seq: 1 }, type: "session.next.prompted", data: { prompt: { text: "Hello" } } }]) + Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(2), Stream.runCollect)), + ).toMatchObject([ + { durable: { seq: 1 }, type: "session.next.prompt.admitted", data: { prompt: { text: "Hello" } } }, + { durable: { seq: 2 }, type: "session.next.prompted" }, + ]) }), ) @@ -273,18 +276,24 @@ describe("SessionV2.create", () => { .pipe(Effect.orDie) expect(yield* store.get(created.id)).toBeUndefined() - expect(yield* events.replayAll(serialized.slice(0, 1))).toBe(created.id) - expect(yield* SessionInput.find(db, admitted.id)).toBeUndefined() + expect(yield* events.replayAll(serialized.slice(0, 2))).toBe(created.id) + expect(yield* SessionInput.find(db, admitted.id)).toMatchObject({ + id: admitted.id, + sessionID: created.id, + prompt: { text: "Replay lifecycle" }, + delivery: "steer", + admittedSeq: 1, + }) expect(yield* store.context(created.id)).toEqual([]) - expect(yield* events.replayAll(serialized.slice(1))).toBe(created.id) + expect(yield* events.replayAll(serialized.slice(2))).toBe(created.id) expect(yield* SessionInput.find(db, admitted.id)).toMatchObject({ id: admitted.id, sessionID: created.id, prompt: { text: "Replay lifecycle" }, delivery: "steer", - admittedSeq: 0, - promotedSeq: 1, + admittedSeq: 1, + promotedSeq: 2, }) expect(yield* store.context(created.id)).toMatchObject([ { id: admitted.id, type: "user", text: "Replay lifecycle" }, @@ -299,7 +308,8 @@ describe("SessionV2.create", () => { .pipe(Effect.orDie)).map((event) => [event.seq, event.type]), ).toEqual([ [0, EventV2.versionedType(SessionV1.Event.Created.type, 1)], - [1, EventV2.versionedType(SessionEvent.Prompted.type, 1)], + [1, EventV2.versionedType(SessionEvent.PromptAdmitted.type, 1)], + [2, EventV2.versionedType(SessionEvent.Prompted.type, 1)], ]) }).pipe(Effect.provide(Layer.fresh(Layer.mergeAll(targetDatabase, targetEvents, targetProjector, targetStore)))) }), diff --git a/packages/core/test/session-projector.test.ts b/packages/core/test/session-projector.test.ts index c80cf54922d0..76c3290a7078 100644 --- a/packages/core/test/session-projector.test.ts +++ b/packages/core/test/session-projector.test.ts @@ -142,7 +142,7 @@ describe("SessionProjector", () => { .pipe(Effect.orDie) const events = yield* EventV2.Service const id = SessionMessage.ID.make("msg_admitted") - const admitted = yield* SessionInput.admit(db, { + const admitted = yield* SessionInput.admit(db, events, { id, sessionID, prompt: new Prompt({ text: "promote me" }), diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index 37167735046d..842474396e27 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -168,7 +168,7 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service const events = yield* EventV2.Service const { db } = yield* Database.Service - const fiber = yield* session.events({ sessionID }).pipe(Stream.take(2), Stream.runCollect, Effect.forkScoped) + const fiber = yield* session.events({ sessionID }).pipe(Stream.take(4), Stream.runCollect, Effect.forkScoped) yield* Effect.yieldNow yield* session.prompt({ sessionID, prompt: new Prompt({ text: "First" }), resume: false }) @@ -177,8 +177,10 @@ describe("SessionV2.prompt", () => { const streamed = Array.from(yield* Fiber.join(fiber)) expect(streamed.map((event) => [event.durable?.seq, event.type])).toEqual([ - [0, "session.next.prompted"], - [1, "session.next.prompted"], + [0, "session.next.prompt.admitted"], + [1, "session.next.prompt.admitted"], + [2, "session.next.prompted"], + [3, "session.next.prompted"], ]) expect( Array.from( @@ -186,7 +188,7 @@ describe("SessionV2.prompt", () => { .events({ sessionID, after: streamed[0]!.durable?.seq }) .pipe(Stream.take(1), Stream.runCollect), ).map((event) => [event.durable?.seq, event.type]), - ).toEqual([[1, "session.next.prompted"]]) + ).toEqual([[1, "session.next.prompt.admitted"]]) }), ) @@ -332,7 +334,7 @@ describe("SessionV2.prompt", () => { expect(messages[1]).toEqual(messages[0]) expect(yield* session.messages({ sessionID })).toEqual([]) expect(yield* admittedCount).toBe(1) - expect(yield* eventCount(EventV2.versionedType(SessionEvent.Prompted.type, 1))).toBe(0) + expect(yield* eventCount(EventV2.versionedType(SessionEvent.PromptAdmitted.type, 1))).toBe(1) }), ) @@ -353,7 +355,7 @@ describe("SessionV2.prompt", () => { ) expect(yield* eventCount(EventV2.versionedType(SessionEvent.Prompted.type, 1))).toBe(1) - expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 0 }) + expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 1 }) expect(yield* session.messages({ sessionID })).toMatchObject([ { id: messageID, type: "user", text: "Promote once" }, ]) @@ -367,7 +369,7 @@ describe("SessionV2.prompt", () => { const session = yield* SessionV2.Service const events = yield* EventV2.Service const first = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Before cutoff" }), resume: false }) - const cutoff = yield* SessionInput.latestAdmittedSeq(db, sessionID) + const cutoff = first.admittedSeq const second = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "After cutoff" }), resume: false }) yield* SessionInput.promoteSteers(db, events, sessionID, cutoff) @@ -377,11 +379,12 @@ describe("SessionV2.prompt", () => { }), ) - it.effect("keeps pending inbox input outside the replayable event stream", () => + it.effect("reprojects pending inbox input without scheduling execution", () => Effect.gen(function* () { yield* setup const { db } = yield* Database.Service const session = yield* SessionV2.Service + const events = yield* EventV2.Service wakeCalls.length = 0 yield* session.prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "Replay pending" }), resume: false }) const recorded = yield* db @@ -391,7 +394,23 @@ describe("SessionV2.prompt", () => { .all() .pipe(Effect.orDie) - expect(recorded).toEqual([]) + yield* events.remove(sessionID) + yield* db.delete(SessionInputTable).where(eq(SessionInputTable.session_id, sessionID)).run().pipe(Effect.orDie) + yield* db + .delete(SessionMessageTable) + .where(eq(SessionMessageTable.session_id, sessionID)) + .run() + .pipe(Effect.orDie) + yield* events.replayAll( + recorded.map((event) => ({ + id: event.id, + aggregateID: event.aggregate_id, + seq: event.seq, + type: event.type, + data: event.data, + })), + ) + expect(yield* admitted(messageID)).toMatchObject({ id: messageID, prompt: { text: "Replay pending" } }) expect(yield* session.messages({ sessionID })).toEqual([]) expect(wakeCalls).toEqual([]) @@ -470,6 +489,27 @@ describe("SessionV2.prompt", () => { }), ) + it.effect("rejects a prompt ID already used by visible Session history", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + const events = yield* EventV2.Service + yield* events.publish(SessionEvent.Synthetic, { + sessionID, + messageID, + timestamp: yield* DateTime.now, + text: "Existing history", + }) + + const failure = yield* session + .prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "Conflicting prompt" }), resume: false }) + .pipe(Effect.flip) + + expect(failure).toMatchObject({ _tag: "Session.PromptConflictError", sessionID, messageID }) + expect(yield* admitted(messageID)).toBeUndefined() + }), + ) + it.effect("starts execution by default after recording the prompt", () => Effect.gen(function* () { yield* setup diff --git a/packages/core/test/session-runner-recorded.test.ts b/packages/core/test/session-runner-recorded.test.ts index b84930e27f1c..331c5bb48c17 100644 --- a/packages/core/test/session-runner-recorded.test.ts +++ b/packages/core/test/session-runner-recorded.test.ts @@ -175,6 +175,7 @@ describe("SessionRunnerLLM recorded", () => { .orderBy(EventTable.seq) .all()).map((event) => event.type), ).toEqual([ + "session.next.prompt.admitted.1", "session.next.prompted.1", "session.next.step.started.1", "session.next.text.started.1", diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index d6e7d488e1d1..f71b010420d8 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -19,6 +19,7 @@ export type Event = | EventSessionNextModelSwitched | EventSessionNextMoved | EventSessionNextPrompted + | EventSessionNextPromptAdmitted | EventSessionNextContextUpdated | EventSessionNextSynthetic | EventSessionNextShellStarted @@ -851,6 +852,17 @@ export type GlobalEvent = { delivery: "steer" | "queue" } } + | { + id: string + type: "session.next.prompt.admitted" + properties: { + timestamp: number + sessionID: string + messageID: string + prompt: Prompt + delivery: "steer" | "queue" + } + } | { id: string type: "session.next.context.updated" @@ -1603,6 +1615,7 @@ export type GlobalEvent = { | SyncEventSessionNextModelSwitched | SyncEventSessionNextMoved | SyncEventSessionNextPrompted + | SyncEventSessionNextPromptAdmitted | SyncEventSessionNextContextUpdated | SyncEventSessionNextSynthetic | SyncEventSessionNextShellStarted @@ -2743,6 +2756,7 @@ export type V2Event = | V2EventSessionNextModelSwitched | V2EventSessionNextMoved | V2EventSessionNextPrompted + | V2EventSessionNextPromptAdmitted | V2EventSessionNextContextUpdated | V2EventSessionNextSynthetic | V2EventSessionNextShellStarted @@ -3174,6 +3188,24 @@ export type SyncEventSessionNextPrompted = { } } +export type SyncEventSessionNextPromptAdmitted = { + type: "sync" + id: string + syncEvent: { + type: "session.next.prompt.admitted.1" + id: string + seq: number + aggregateID: string + data: { + timestamp: number + sessionID: string + messageID: string + prompt: Prompt + delivery: "steer" | "queue" + } + } +} + export type SyncEventSessionNextContextUpdated = { type: "sync" id: string @@ -3688,9 +3720,6 @@ export type SessionV2Info = { } export type SessionInputAdmitted = { - /** - * Session-local inbox order; not an Event cursor - */ admittedSeq: number id: string sessionID: string @@ -4441,6 +4470,27 @@ export type V2EventSessionNextPrompted = { } } +export type V2EventSessionNextPromptAdmitted = { + id: string + metadata?: { + [key: string]: unknown + } + durable?: { + aggregateID: string + seq: number + version: number + } + location?: LocationRef + type: "session.next.prompt.admitted" + data: { + timestamp: number + sessionID: string + messageID: string + prompt: Prompt + delivery: "steer" | "queue" + } +} + export type V2EventSessionNextContextUpdated = { id: string metadata?: { @@ -6053,6 +6103,18 @@ export type EventSessionNextPrompted = { } } +export type EventSessionNextPromptAdmitted = { + id: string + type: "session.next.prompt.admitted" + properties: { + timestamp: number + sessionID: string + messageID: string + prompt: Prompt + delivery: "steer" | "queue" + } +} + export type EventSessionNextContextUpdated = { id: string type: "session.next.context.updated" diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 48ba095575ba..0f8f5ca1980d 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -14683,6 +14683,9 @@ { "$ref": "#/components/schemas/EventSessionNextPrompted" }, + { + "$ref": "#/components/schemas/EventSessionNextPromptAdmitted" + }, { "$ref": "#/components/schemas/EventSessionNextContextUpdated" }, @@ -17209,6 +17212,46 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, + { + "type": "object", + "properties": { + "id": { + "type": "string", + "pattern": "^evt_" + }, + "type": { + "type": "string", + "enum": ["session.next.prompt.admitted"] + }, + "properties": { + "type": "object", + "properties": { + "timestamp": { + "type": "number" + }, + "sessionID": { + "type": "string", + "pattern": "^ses" + }, + "messageID": { + "type": "string", + "pattern": "^msg_" + }, + "prompt": { + "$ref": "#/components/schemas/Prompt" + }, + "delivery": { + "type": "string", + "enum": ["steer", "queue"] + } + }, + "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], + "additionalProperties": false + } + }, + "required": ["id", "type", "properties"], + "additionalProperties": false + }, { "type": "object", "properties": { @@ -19744,6 +19787,9 @@ { "$ref": "#/components/schemas/SyncEventSessionNextPrompted" }, + { + "$ref": "#/components/schemas/SyncEventSessionNextPromptAdmitted" + }, { "$ref": "#/components/schemas/SyncEventSessionNextContextUpdated" }, @@ -22979,6 +23025,9 @@ { "$ref": "#/components/schemas/V2EventSessionNextPrompted" }, + { + "$ref": "#/components/schemas/V2EventSessionNextPromptAdmitted" + }, { "$ref": "#/components/schemas/V2EventSessionNextContextUpdated" }, @@ -24242,6 +24291,67 @@ "required": ["type", "id", "syncEvent"], "additionalProperties": false }, + "SyncEventSessionNextPromptAdmitted": { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": ["sync"] + }, + "id": { + "type": "string", + "pattern": "^evt_" + }, + "syncEvent": { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": ["session.next.prompt.admitted.1"] + }, + "id": { + "type": "string", + "pattern": "^evt_" + }, + "seq": { + "type": "number" + }, + "aggregateID": { + "type": "string" + }, + "data": { + "type": "object", + "properties": { + "timestamp": { + "type": "number" + }, + "sessionID": { + "type": "string", + "pattern": "^ses" + }, + "messageID": { + "type": "string", + "pattern": "^msg_" + }, + "prompt": { + "$ref": "#/components/schemas/Prompt" + }, + "delivery": { + "type": "string", + "enum": ["steer", "queue"] + } + }, + "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], + "additionalProperties": false + } + }, + "required": ["type", "id", "seq", "aggregateID", "data"], + "additionalProperties": false + } + }, + "required": ["type", "id", "syncEvent"], + "additionalProperties": false + }, "SyncEventSessionNextContextUpdated": { "type": "object", "properties": { @@ -25872,8 +25982,7 @@ "properties": { "admittedSeq": { "type": "integer", - "minimum": 0, - "description": "Session-local inbox order; not an Event cursor" + "minimum": 0 }, "id": { "type": "string", @@ -28343,6 +28452,68 @@ "required": ["id", "type", "data"], "additionalProperties": false }, + "V2EventSessionNextPromptAdmitted": { + "type": "object", + "properties": { + "id": { + "type": "string", + "pattern": "^evt_" + }, + "metadata": { + "type": "object" + }, + "durable": { + "type": "object", + "properties": { + "aggregateID": { + "type": "string" + }, + "seq": { + "type": "integer" + }, + "version": { + "type": "integer" + } + }, + "required": ["aggregateID", "seq", "version"], + "additionalProperties": false + }, + "location": { + "$ref": "#/components/schemas/LocationRef" + }, + "type": { + "type": "string", + "enum": ["session.next.prompt.admitted"] + }, + "data": { + "type": "object", + "properties": { + "timestamp": { + "type": "number" + }, + "sessionID": { + "type": "string", + "pattern": "^ses" + }, + "messageID": { + "type": "string", + "pattern": "^msg_" + }, + "prompt": { + "$ref": "#/components/schemas/Prompt" + }, + "delivery": { + "type": "string", + "enum": ["steer", "queue"] + } + }, + "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], + "additionalProperties": false + } + }, + "required": ["id", "type", "data"], + "additionalProperties": false + }, "V2EventSessionNextContextUpdated": { "type": "object", "properties": { @@ -32902,6 +33073,46 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, + "EventSessionNextPromptAdmitted": { + "type": "object", + "properties": { + "id": { + "type": "string", + "pattern": "^evt_" + }, + "type": { + "type": "string", + "enum": ["session.next.prompt.admitted"] + }, + "properties": { + "type": "object", + "properties": { + "timestamp": { + "type": "number" + }, + "sessionID": { + "type": "string", + "pattern": "^ses" + }, + "messageID": { + "type": "string", + "pattern": "^msg_" + }, + "prompt": { + "$ref": "#/components/schemas/Prompt" + }, + "delivery": { + "type": "string", + "enum": ["steer", "queue"] + } + }, + "required": ["timestamp", "sessionID", "messageID", "prompt", "delivery"], + "additionalProperties": false + } + }, + "required": ["id", "type", "properties"], + "additionalProperties": false + }, "EventSessionNextContextUpdated": { "type": "object", "properties": { diff --git a/packages/tui/src/context/data.tsx b/packages/tui/src/context/data.tsx index b94661e1737c..9b2e58907ad9 100644 --- a/packages/tui/src/context/data.tsx +++ b/packages/tui/src/context/data.tsx @@ -162,6 +162,8 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ }) break } + case "session.next.prompt.admitted": + break case "session.next.context.updated": message.update(event.data.sessionID, (draft) => { message.prepend(draft, { diff --git a/packages/tui/test/cli/tui/data.test.tsx b/packages/tui/test/cli/tui/data.test.tsx index bfc209c164ed..279ba4d065c4 100644 --- a/packages/tui/test/cli/tui/data.test.tsx +++ b/packages/tui/test/cli/tui/data.test.tsx @@ -370,7 +370,7 @@ test("settles pending tools when a live failure arrives", async () => { } }) -test("renders a prompted user message", async () => { +test("renders admitted prompts only after they become model-visible", async () => { const events = createEventSource() const calls = createFetch(undefined, events) let sync!: ReturnType @@ -399,6 +399,19 @@ test("renders a prompted user message", async () => { try { await mounted + emitEvent(events, { + id: "evt_admitted_1", + type: "session.next.prompt.admitted", + properties: { + sessionID: "session-1", + messageID: "msg_user_1", + timestamp: 0, + prompt: { text: "hello" }, + delivery: "steer", + }, + }) + expect(sync.session.message.list("session-1") ?? []).toEqual([]) + emitEvent(events, { id: "evt_prompted_1", type: "session.next.prompted", diff --git a/specs/v2/schema-changelog.md b/specs/v2/schema-changelog.md index 5238655f6159..bdd483715627 100644 --- a/specs/v2/schema-changelog.md +++ b/specs/v2/schema-changelog.md @@ -1,9 +1,9 @@ # V2 Schema Changelog -## 2026-06-22: Simplify Session Input Admission +## 2026-06-22: Simplify Session Input Promotion -- Admit prompts directly into the durable `session_input` inbox without adding invisible work to the Session event stream. -- Replace `session.next.prompt.admitted.1` and `session.next.prompt.promoted.1` with the existing `session.next.prompted.1` event when input becomes model-visible. +- Keep `session.next.prompt.admitted.1` as the durable, client-visible record of pending Session input. +- Replace `session.next.prompt.promoted.1` with the existing `session.next.prompted.1` event when input becomes model-visible. - Preserve the prompt endpoint, admission receipt, idempotency, steer/queue ordering, and atomic user-message projection. - Reset experimental V2 events, projections, inputs, Context Epochs, and synchronized workspace state while preserving canonical V1 `session`, `message`, and `part` rows. diff --git a/specs/v2/session.md b/specs/v2/session.md index 9c709e432308..6788a893eafa 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -27,9 +27,9 @@ sessions.interrupt(sessionID) -> idle or missing Session is a no-op ``` -`session_input` is the durable admission inbox. Admission is a direct idempotent database transaction and does not create a transcript event. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `Prompted`. Its projector atomically writes the visible user message and marks the inbox row promoted in the same event transaction. The V1-to-V2 shadow bridge publishes the same `Prompted` event for already-visible V1 prompts. +`session_input` is the durable admission inbox. `PromptAdmitted` records and projects accepted input so pending queue state can be replayed, replicated, and observed by clients. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `Prompted`. Its projector atomically writes the visible user message and marks the inbox row promoted in the same event transaction. The V1-to-V2 shadow bridge publishes the same `Prompted` event for already-visible V1 prompts. -`admittedSeq` orders inputs within one Session inbox. It is not an EventV2 cursor and may be synthesized locally when replay reconstructs a visible prompt. +`admittedSeq` is the durable Session event sequence of `PromptAdmitted`. Clients may use the admission event to represent queued input before `Prompted` makes it part of visible conversation history. Execution routing starts from only the Session ID: From 823392e6bbe841656bfa232b642d1283d6643848 Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 22 Jun 2026 17:42:56 -0400 Subject: [PATCH 3/3] refactor(core): reuse prompt message projection --- packages/core/src/session/input.ts | 20 ++++---------------- packages/core/src/session/projector.ts | 21 +++++++++------------ 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/packages/core/src/session/input.ts b/packages/core/src/session/input.ts index 068523aa4938..a45a37100376 100644 --- a/packages/core/src/session/input.ts +++ b/packages/core/src/session/input.ts @@ -154,17 +154,17 @@ export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(functio if (updated) { const stored = fromRow(updated) if (!matchesProjection(stored, input)) return yield* Effect.die(new LifecycleConflict({ id: input.id })) - return toMessage(stored) + return } const stored = yield* find(db, input.id) if (stored) { if (!matchesProjection(stored, input) || stored.promotedSeq !== input.promotedSeq) return yield* Effect.die(new LifecycleConflict({ id: input.id })) - return toMessage(stored) + return } - const inserted = yield* db + yield* db .insert(SessionInputTable) .values({ id: input.id, @@ -175,10 +175,8 @@ export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(functio promoted_seq: input.promotedSeq, time_created: DateTime.toEpochMillis(input.timeCreated), }) - .returning() - .get() + .run() .pipe(Effect.orDie) - return toMessage(fromRow(inserted)) }) export const hasPending = Effect.fn("SessionInput.hasPending")(function* ( @@ -300,13 +298,3 @@ export const promoteNextQueued = Effect.fn("SessionInput.promoteNextQueued")(fun .pipe(Effect.orDie) return row === undefined ? false : yield* publish(db, events, sessionID, [row]).pipe(Effect.as(true)) }) - -const toMessage = (input: Admitted) => - new SessionMessage.User({ - id: input.id, - type: "user", - text: input.prompt.text, - files: input.prompt.files, - agents: input.prompt.agents, - time: { created: input.timeCreated }, - }) diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index 1ab32fe31061..4a0512d4759d 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -350,18 +350,15 @@ export const layer = Layer.effectDiscard( yield* events.project(SessionEvent.Prompted, (event) => Effect.gen(function* () { if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") - yield* insertMessage( - db, - event, - yield* SessionInput.projectPrompted(db, { - id: event.data.messageID, - sessionID: event.data.sessionID, - prompt: event.data.prompt, - delivery: event.data.delivery, - timeCreated: event.data.timestamp, - promotedSeq: event.durable.seq, - }), - ) + yield* SessionInput.projectPrompted(db, { + id: event.data.messageID, + sessionID: event.data.sessionID, + prompt: event.data.prompt, + delivery: event.data.delivery, + timeCreated: event.data.timestamp, + promotedSeq: event.durable.seq, + }) + yield* run(db, event) }), ) yield* events.project(SessionEvent.PromptAdmitted, (event) =>