diff --git a/packages/core/src/database/migration.gen.ts b/packages/core/src/database/migration.gen.ts index 19b1b5684325..e6ea4eaa1477 100644 --- a/packages/core/src/database/migration.gen.ts +++ b/packages/core/src/database/migration.gen.ts @@ -39,5 +39,6 @@ export const migrations = ( import("./migration/20260612174303_project_dir_strategy"), import("./migration/20260622142730_simplify_session_context_epoch"), import("./migration/20260622170816_reset_v2_session_state"), + import("./migration/20260622202450_simplify_session_input"), ]) ).map((module) => module.default) satisfies DatabaseMigration.Migration[] diff --git a/packages/core/src/database/migration/20260622202450_simplify_session_input.ts b/packages/core/src/database/migration/20260622202450_simplify_session_input.ts new file mode 100644 index 000000000000..0b5ddd1bfde3 --- /dev/null +++ b/packages/core/src/database/migration/20260622202450_simplify_session_input.ts @@ -0,0 +1,17 @@ +import { Effect } from "effect" +import type { DatabaseMigration } from "../migration" + +export default { + id: "20260622202450_simplify_session_input", + up(tx) { + return Effect.gen(function* () { + yield* tx.run(`DELETE FROM \`session_context_epoch\`;`) + yield* tx.run(`DELETE FROM \`session_input\`;`) + yield* tx.run(`DELETE FROM \`session_message\`;`) + yield* tx.run(`DELETE FROM \`event\`;`) + yield* tx.run(`DELETE FROM \`event_sequence\`;`) + yield* tx.run(`UPDATE \`session\` SET \`workspace_id\` = NULL WHERE \`workspace_id\` IS NOT NULL;`) + yield* tx.run(`DELETE FROM \`workspace\`;`) + }) + }, +} satisfies DatabaseMigration.Migration diff --git a/packages/core/src/event.ts b/packages/core/src/event.ts index 32aaeae6995c..3439a0aefc85 100644 --- a/packages/core/src/event.ts +++ b/packages/core/src/event.ts @@ -46,6 +46,19 @@ export type Payload = { export type Subscriber = (event: Payload) => Effect.Effect export type Unsubscribe = Effect.Effect +export const latestSequence = Effect.fn("EventV2.latestSequence")(function* ( + db: Database.Interface["db"], + aggregateID: string, +) { + const row = yield* db + .select({ seq: EventSequenceTable.seq }) + .from(EventSequenceTable) + .where(eq(EventSequenceTable.aggregate_id, aggregateID)) + .get() + .pipe(Effect.orDie) + return row?.seq ?? -1 +}) + export type SerializedEvent = { readonly id: ID readonly type: string diff --git a/packages/core/src/session/context-epoch.ts b/packages/core/src/session/context-epoch.ts index 18624706a973..f06b69cd5151 100644 --- a/packages/core/src/session/context-epoch.ts +++ b/packages/core/src/session/context-epoch.ts @@ -64,7 +64,7 @@ const prepareOnce = Effect.fnUntraced(function* ( return { baseline: stored.baseline, baselineSeq: stored.baseline_seq } } if (result._tag === "ReplacementReady") { - const baselineSeq = replacementSeq ?? (yield* SessionInput.latestSeq(db, sessionID)) + const baselineSeq = replacementSeq ?? (yield* EventV2.latestSequence(db, sessionID)) yield* replace(db, sessionID, baselineSeq, result.generation) return { baseline: result.generation.baseline, baselineSeq } } @@ -124,7 +124,7 @@ const insert = Effect.fnUntraced(function* ( sessionID: SessionSchema.ID, generation: SystemContext.Generation, ) { - const baselineSeq = yield* SessionInput.latestSeq(db, sessionID) + const baselineSeq = yield* EventV2.latestSequence(db, sessionID) yield* db .insert(SessionContextEpochTable) .values({ diff --git a/packages/core/src/session/event.ts b/packages/core/src/session/event.ts index d4b773d18bf7..88ac4aa2679b 100644 --- a/packages/core/src/session/event.ts +++ b/packages/core/src/session/event.ts @@ -25,6 +25,12 @@ const Base = { timestamp: V2Schema.DateTimeUtcFromMillis, sessionID: SessionSchema.ID, } +const PromptFields = { + ...Base, + messageID: SessionMessageID.ID, + prompt: Prompt, + delivery: Schema.Literals(["steer", "queue"]), +} const options = { durable: { @@ -83,40 +89,16 @@ export type Moved = typeof Moved.Type export const Prompted = EventV2.define({ type: "session.next.prompted", ...options, - schema: { - ...Base, - messageID: SessionMessageID.ID, - prompt: Prompt, - delivery: Schema.Literals(["steer", "queue"]), - }, + schema: PromptFields, }) export type Prompted = typeof Prompted.Type -export namespace PromptLifecycle { - export const Admitted = EventV2.define({ - type: "session.next.prompt.admitted", - ...options, - schema: { - ...Base, - messageID: SessionMessageID.ID, - prompt: Prompt, - delivery: Schema.Literals(["steer", "queue"]), - }, - }) - export type Admitted = typeof Admitted.Type - - export const Promoted = EventV2.define({ - type: "session.next.prompt.promoted", - ...options, - schema: { - ...Base, - messageID: SessionMessageID.ID, - prompt: Prompt, - timeCreated: V2Schema.DateTimeUtcFromMillis, - }, - }) - export type Promoted = typeof Promoted.Type -} +export const PromptAdmitted = EventV2.define({ + type: "session.next.prompt.admitted", + ...options, + schema: PromptFields, +}) +export type PromptAdmitted = typeof PromptAdmitted.Type export const ContextUpdated = EventV2.define({ type: "session.next.context.updated", @@ -455,8 +437,7 @@ const DurableDefinitions = [ ModelSwitched, Moved, Prompted, - PromptLifecycle.Admitted, - PromptLifecycle.Promoted, + PromptAdmitted, ContextUpdated, Synthetic, Shell.Started, diff --git a/packages/core/src/session/input.ts b/packages/core/src/session/input.ts index f8bc2b0e6bb0..a45a37100376 100644 --- a/packages/core/src/session/input.ts +++ b/packages/core/src/session/input.ts @@ -4,7 +4,6 @@ import { and, asc, eq, isNull, lte } from "drizzle-orm" import { DateTime, Effect, Schema } from "effect" import type { Database } from "../database/database" import type { EventV2 } from "../event" -import { EventSequenceTable } from "../event/sql" import { NonNegativeInt } from "../schema" import { V2Schema } from "../v2-schema" import { SessionEvent } from "./event" @@ -65,7 +64,7 @@ export const admit = Effect.fn("SessionInput.admit")(function* ( if (existing !== undefined) return existing const timestamp = yield* DateTime.now return yield* events - .publish(SessionEvent.PromptLifecycle.Admitted, { + .publish(SessionEvent.PromptAdmitted, { messageID: input.id, sessionID: input.sessionID, timestamp, @@ -93,19 +92,6 @@ export const admit = Effect.fn("SessionInput.admit")(function* ( ) }) -export const latestSeq = Effect.fn("SessionInput.latestSeq")(function* ( - db: DatabaseService, - sessionID: SessionSchema.ID, -) { - const row = yield* db - .select({ seq: EventSequenceTable.seq }) - .from(EventSequenceTable) - .where(eq(EventSequenceTable.aggregate_id, sessionID)) - .get() - .pipe(Effect.orDie) - return row?.seq ?? -1 -}) - export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(function* ( db: DatabaseService, input: { @@ -117,6 +103,13 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio readonly timeCreated: DateTime.Utc }, ) { + const message = yield* db + .select({ id: SessionMessageTable.id }) + .from(SessionMessageTable) + .where(eq(SessionMessageTable.id, input.id)) + .get() + .pipe(Effect.orDie) + if (message !== undefined) return yield* Effect.die(new LifecycleConflict({ id: input.id })) const stored = yield* db .insert(SessionInputTable) .values({ @@ -134,12 +127,13 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio if (!stored) return yield* Effect.die(new LifecycleConflict({ id: input.id })) }) -export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(function* ( +export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(function* ( db: DatabaseService, input: { readonly id: SessionMessage.ID readonly sessionID: SessionSchema.ID readonly prompt: Prompt + readonly delivery: Delivery readonly timeCreated: DateTime.Utc readonly promotedSeq: number }, @@ -157,14 +151,32 @@ export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(functio .returning() .get() .pipe(Effect.orDie) - if (!updated) return yield* Effect.die(new LifecycleConflict({ id: input.id })) - const stored = fromRow(updated) - if ( - !matchesPrompt(stored, input) || - DateTime.toEpochMillis(stored.timeCreated) !== DateTime.toEpochMillis(input.timeCreated) - ) - return yield* Effect.die(new LifecycleConflict({ id: input.id })) - return toMessage(stored) + if (updated) { + const stored = fromRow(updated) + if (!matchesProjection(stored, input)) return yield* Effect.die(new LifecycleConflict({ id: input.id })) + return + } + + const stored = yield* find(db, input.id) + if (stored) { + if (!matchesProjection(stored, input) || stored.promotedSeq !== input.promotedSeq) + return yield* Effect.die(new LifecycleConflict({ id: input.id })) + return + } + + yield* db + .insert(SessionInputTable) + .values({ + id: input.id, + session_id: input.sessionID, + prompt: encodePrompt(input.prompt), + delivery: input.delivery, + admitted_seq: input.promotedSeq, + promoted_seq: input.promotedSeq, + time_created: DateTime.toEpochMillis(input.timeCreated), + }) + .run() + .pipe(Effect.orDie) }) export const hasPending = Effect.fn("SessionInput.hasPending")(function* ( @@ -201,35 +213,17 @@ const matchesPrompt = (input: Admitted, expected: { readonly sessionID: SessionS input.sessionID === expected.sessionID && JSON.stringify(encodePrompt(input.prompt)) === JSON.stringify(encodePrompt(expected.prompt)) -export const projectLegacyPrompted = Effect.fn("SessionInput.projectLegacyPrompted")(function* ( - db: DatabaseService, - input: { - readonly id: SessionMessage.ID +const matchesProjection = ( + input: Admitted, + expected: { readonly sessionID: SessionSchema.ID readonly prompt: Prompt readonly delivery: Delivery readonly timeCreated: DateTime.Utc - readonly promotedSeq: number }, -) { - const inserted = yield* db - .insert(SessionInputTable) - .values({ - id: input.id, - session_id: input.sessionID, - admitted_seq: input.promotedSeq, - prompt: encodePrompt(input.prompt), - delivery: input.delivery, - promoted_seq: input.promotedSeq, - time_created: DateTime.toEpochMillis(input.timeCreated), - }) - .onConflictDoNothing() - .returning() - .get() - .pipe(Effect.orDie) - if (!inserted) return yield* Effect.die("Prompt projection conflicts with admitted input") - return fromRow(inserted) -}) +) => + equivalent(input, expected) && + DateTime.toEpochMillis(input.timeCreated) === DateTime.toEpochMillis(expected.timeCreated) const publish = Effect.fn("SessionInput.publish")(function* ( db: DatabaseService, @@ -238,18 +232,19 @@ const publish = Effect.fn("SessionInput.publish")(function* ( rows: ReadonlyArray, ) { for (const row of rows) { + const id = SessionMessage.ID.make(row.id) yield* events - .publish(SessionEvent.PromptLifecycle.Promoted, { + .publish(SessionEvent.Prompted, { sessionID, - timestamp: yield* DateTime.now, - messageID: SessionMessage.ID.make(row.id), + timestamp: DateTime.makeUnsafe(row.time_created), + messageID: id, prompt: decodePrompt(row.prompt), - timeCreated: DateTime.makeUnsafe(row.time_created), + delivery: row.delivery, }) .pipe( Effect.catchDefect((defect) => defect instanceof LifecycleConflict - ? find(db, SessionMessage.ID.make(row.id)).pipe( + ? find(db, id).pipe( Effect.flatMap((stored) => (stored?.promotedSeq === undefined ? Effect.die(defect) : Effect.void)), ) : Effect.die(defect), @@ -303,13 +298,3 @@ export const promoteNextQueued = Effect.fn("SessionInput.promoteNextQueued")(fun .pipe(Effect.orDie) return row === undefined ? false : yield* publish(db, events, sessionID, [row]).pipe(Effect.as(true)) }) - -const toMessage = (input: Admitted) => - new SessionMessage.User({ - id: input.id, - type: "user", - text: input.prompt.text, - files: input.prompt.files, - agents: input.prompt.agents, - time: { created: input.timeCreated }, - }) diff --git a/packages/core/src/session/message-updater.ts b/packages/core/src/session/message-updater.ts index 2c836dcd0173..4ece3c2d1954 100644 --- a/packages/core/src/session/message-updater.ts +++ b/packages/core/src/session/message-updater.ts @@ -137,7 +137,6 @@ export function update(adapter: Adapter, event: SessionEvent.Event) { ) }, "session.next.prompt.admitted": () => Effect.void, - "session.next.prompt.promoted": () => Effect.void, "session.next.context.updated": (event) => adapter.appendMessage( new SessionMessage.System({ diff --git a/packages/core/src/session/projector.ts b/packages/core/src/session/projector.ts index 2c87dfb8dffb..4a0512d4759d 100644 --- a/packages/core/src/session/projector.ts +++ b/packages/core/src/session/projector.ts @@ -21,7 +21,6 @@ type DatabaseService = Database.Interface["db"] const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message) const encodeMessage = Schema.encodeSync(SessionMessage.Message) -class PromptAlreadyProjected extends Error {} export class SessionAlreadyProjected extends Error {} type Usage = { @@ -350,27 +349,19 @@ export const layer = Layer.effectDiscard( ) yield* events.project(SessionEvent.Prompted, (event) => Effect.gen(function* () { - const messageID = event.data.messageID - const existing = yield* db - .select({ id: SessionMessageTable.id }) - .from(SessionMessageTable) - .where(eq(SessionMessageTable.id, messageID)) - .get() - .pipe(Effect.orDie) - if (existing) return yield* Effect.die(new PromptAlreadyProjected()) - yield* run(db, event) if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") - yield* SessionInput.projectLegacyPrompted(db, { - id: messageID, + yield* SessionInput.projectPrompted(db, { + id: event.data.messageID, sessionID: event.data.sessionID, prompt: event.data.prompt, delivery: event.data.delivery, timeCreated: event.data.timestamp, promotedSeq: event.durable.seq, }) + yield* run(db, event) }), ) - yield* events.project(SessionEvent.PromptLifecycle.Admitted, (event) => + yield* events.project(SessionEvent.PromptAdmitted, (event) => Effect.gen(function* () { if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") yield* SessionInput.projectAdmitted(db, { @@ -383,22 +374,6 @@ export const layer = Layer.effectDiscard( }) }), ) - yield* events.project(SessionEvent.PromptLifecycle.Promoted, (event) => - Effect.gen(function* () { - if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence") - yield* insertMessage( - db, - event, - yield* SessionInput.projectPromoted(db, { - id: event.data.messageID, - sessionID: event.data.sessionID, - prompt: event.data.prompt, - timeCreated: event.data.timeCreated, - promotedSeq: event.durable.seq, - }), - ) - }), - ) yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event)) yield* events.project(SessionEvent.Synthetic, (event) => run(db, event)) yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event)) diff --git a/packages/core/src/session/runner/llm.ts b/packages/core/src/session/runner/llm.ts index ddd2bf4e152a..9caae3849472 100644 --- a/packages/core/src/session/runner/llm.ts +++ b/packages/core/src/session/runner/llm.ts @@ -176,7 +176,7 @@ export const layer = Layer.effect( const toolFibers = yield* FiberSet.make() let needsContinuation = false if (promotion) { - const cutoff = yield* SessionInput.latestSeq(db, session.id) + const cutoff = yield* EventV2.latestSequence(db, session.id) if (promotion === "steer") yield* SessionInput.promoteSteers(db, events, session.id, cutoff) if (promotion === "queue") { yield* SessionInput.promoteNextQueued(db, events, session.id) diff --git a/packages/core/test/database-migration.test.ts b/packages/core/test/database-migration.test.ts index 835f41931d60..63768719355c 100644 --- a/packages/core/test/database-migration.test.ts +++ b/packages/core/test/database-migration.test.ts @@ -14,7 +14,7 @@ import sessionMessageProjectionOrderMigration from "@opencode-ai/core/database/m import eventSourcedSessionInputMigration from "@opencode-ai/core/database/migration/20260604172448_event_sourced_session_input" import contextEpochAgentMigration from "@opencode-ai/core/database/migration/20260605042240_add_context_epoch_agent" import simplifyIntegrationCredentialsMigration from "@opencode-ai/core/database/migration/20260611192811_lush_chimera" -import resetV2SessionStateMigration from "@opencode-ai/core/database/migration/20260622170816_reset_v2_session_state" +import simplifySessionInputMigration from "@opencode-ai/core/database/migration/20260622202450_simplify_session_input" import { EventV2 } from "@opencode-ai/core/event" import { ProjectV2 } from "@opencode-ai/core/project" import { ProjectTable } from "@opencode-ai/core/project/sql" @@ -264,8 +264,8 @@ describe("DatabaseMigration", () => { yield* db.run( sql`INSERT INTO session_context_epoch (session_id, baseline, snapshot, baseline_seq) VALUES ('session', 'baseline', '{}', 9)`, ) - yield* db.run(sql`DELETE FROM migration WHERE id = ${resetV2SessionStateMigration.id}`) - yield* DatabaseMigration.applyOnly(db, [resetV2SessionStateMigration]) + yield* db.run(sql`DELETE FROM migration WHERE id = ${simplifySessionInputMigration.id}`) + yield* DatabaseMigration.applyOnly(db, [simplifySessionInputMigration]) const database = Layer.succeed(Database.Service, { db }) const events = EventV2.layer.pipe(Layer.provide(database)) diff --git a/packages/core/test/session-create.test.ts b/packages/core/test/session-create.test.ts index 6fd80c60da1d..1ae6c011fc4a 100644 --- a/packages/core/test/session-create.test.ts +++ b/packages/core/test/session-create.test.ts @@ -225,7 +225,7 @@ describe("SessionV2.create", () => { Array.from(yield* session.events({ sessionID: created.id }).pipe(Stream.take(2), Stream.runCollect)), ).toMatchObject([ { durable: { seq: 1 }, type: "session.next.prompt.admitted", data: { prompt: { text: "Hello" } } }, - { durable: { seq: 2 }, type: "session.next.prompt.promoted" }, + { durable: { seq: 2 }, type: "session.next.prompted" }, ]) }), ) @@ -308,8 +308,8 @@ describe("SessionV2.create", () => { .pipe(Effect.orDie)).map((event) => [event.seq, event.type]), ).toEqual([ [0, EventV2.versionedType(SessionV1.Event.Created.type, 1)], - [1, EventV2.versionedType(SessionEvent.PromptLifecycle.Admitted.type, 1)], - [2, EventV2.versionedType(SessionEvent.PromptLifecycle.Promoted.type, 1)], + [1, EventV2.versionedType(SessionEvent.PromptAdmitted.type, 1)], + [2, EventV2.versionedType(SessionEvent.Prompted.type, 1)], ]) }).pipe(Effect.provide(Layer.fresh(Layer.mergeAll(targetDatabase, targetEvents, targetProjector, targetStore)))) }), diff --git a/packages/core/test/session-projector.test.ts b/packages/core/test/session-projector.test.ts index a0894d07eb0f..76c3290a7078 100644 --- a/packages/core/test/session-projector.test.ts +++ b/packages/core/test/session-projector.test.ts @@ -120,7 +120,7 @@ describe("SessionProjector", () => { ), ) - it.effect("marks an admitted lifecycle row promoted with the PromptPromoted event sequence", () => + it.effect("marks an inbox row promoted with the Prompted event sequence", () => Effect.gen(function* () { const { db } = yield* Database.Service yield* db @@ -142,19 +142,20 @@ describe("SessionProjector", () => { .pipe(Effect.orDie) const events = yield* EventV2.Service const id = SessionMessage.ID.make("msg_admitted") - yield* SessionInput.admit(db, events, { + const admitted = yield* SessionInput.admit(db, events, { id, sessionID, prompt: new Prompt({ text: "promote me" }), delivery: "steer", }) + if (!admitted) return yield* Effect.die("Prompt admission failed") - const event = yield* events.publish(SessionEvent.PromptLifecycle.Promoted, { + const event = yield* events.publish(SessionEvent.Prompted, { sessionID, - timestamp: created, + timestamp: admitted.timeCreated, messageID: id, prompt: new Prompt({ text: "promote me" }), - timeCreated: created, + delivery: "steer", }) expect( diff --git a/packages/core/test/session-prompt.test.ts b/packages/core/test/session-prompt.test.ts index 166b5deed12c..842474396e27 100644 --- a/packages/core/test/session-prompt.test.ts +++ b/packages/core/test/session-prompt.test.ts @@ -179,8 +179,8 @@ describe("SessionV2.prompt", () => { expect(streamed.map((event) => [event.durable?.seq, event.type])).toEqual([ [0, "session.next.prompt.admitted"], [1, "session.next.prompt.admitted"], - [2, "session.next.prompt.promoted"], - [3, "session.next.prompt.promoted"], + [2, "session.next.prompted"], + [3, "session.next.prompted"], ]) expect( Array.from( @@ -334,7 +334,7 @@ describe("SessionV2.prompt", () => { expect(messages[1]).toEqual(messages[0]) expect(yield* session.messages({ sessionID })).toEqual([]) expect(yield* admittedCount).toBe(1) - expect(yield* eventCount(EventV2.versionedType(SessionEvent.PromptLifecycle.Admitted.type, 1))).toBe(1) + expect(yield* eventCount(EventV2.versionedType(SessionEvent.PromptAdmitted.type, 1))).toBe(1) }), ) @@ -354,7 +354,7 @@ describe("SessionV2.prompt", () => { { concurrency: "unbounded" }, ) - expect(yield* eventCount(EventV2.versionedType(SessionEvent.PromptLifecycle.Promoted.type, 1))).toBe(1) + expect(yield* eventCount(EventV2.versionedType(SessionEvent.Prompted.type, 1))).toBe(1) expect(yield* admitted(messageID)).toMatchObject({ promotedSeq: 1 }) expect(yield* session.messages({ sessionID })).toMatchObject([ { id: messageID, type: "user", text: "Promote once" }, @@ -362,14 +362,14 @@ describe("SessionV2.prompt", () => { }), ) - it.effect("promotes steers only through the captured aggregate cutoff", () => + it.effect("promotes steers only through the captured inbox cutoff", () => Effect.gen(function* () { yield* setup const { db } = yield* Database.Service const session = yield* SessionV2.Service const events = yield* EventV2.Service const first = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Before cutoff" }), resume: false }) - const cutoff = yield* SessionInput.latestSeq(db, sessionID) + const cutoff = first.admittedSeq const second = yield* session.prompt({ sessionID, prompt: new Prompt({ text: "After cutoff" }), resume: false }) yield* SessionInput.promoteSteers(db, events, sessionID, cutoff) @@ -379,7 +379,7 @@ describe("SessionV2.prompt", () => { }), ) - it.effect("reprojects one pending lifecycle without scheduling execution", () => + it.effect("reprojects pending inbox input without scheduling execution", () => Effect.gen(function* () { yield* setup const { db } = yield* Database.Service @@ -489,6 +489,27 @@ describe("SessionV2.prompt", () => { }), ) + it.effect("rejects a prompt ID already used by visible Session history", () => + Effect.gen(function* () { + yield* setup + const session = yield* SessionV2.Service + const events = yield* EventV2.Service + yield* events.publish(SessionEvent.Synthetic, { + sessionID, + messageID, + timestamp: yield* DateTime.now, + text: "Existing history", + }) + + const failure = yield* session + .prompt({ id: messageID, sessionID, prompt: new Prompt({ text: "Conflicting prompt" }), resume: false }) + .pipe(Effect.flip) + + expect(failure).toMatchObject({ _tag: "Session.PromptConflictError", sessionID, messageID }) + expect(yield* admitted(messageID)).toBeUndefined() + }), + ) + it.effect("starts execution by default after recording the prompt", () => Effect.gen(function* () { yield* setup diff --git a/packages/core/test/session-runner-recorded.test.ts b/packages/core/test/session-runner-recorded.test.ts index 65e90cb6d093..331c5bb48c17 100644 --- a/packages/core/test/session-runner-recorded.test.ts +++ b/packages/core/test/session-runner-recorded.test.ts @@ -176,7 +176,7 @@ describe("SessionRunnerLLM recorded", () => { .all()).map((event) => event.type), ).toEqual([ "session.next.prompt.admitted.1", - "session.next.prompt.promoted.1", + "session.next.prompted.1", "session.next.step.started.1", "session.next.text.started.1", "session.next.text.ended.1", diff --git a/packages/core/test/session-runner.test.ts b/packages/core/test/session-runner.test.ts index 6e97ab7939fd..ec932c904bc1 100644 --- a/packages/core/test/session-runner.test.ts +++ b/packages/core/test/session-runner.test.ts @@ -2404,7 +2404,7 @@ describe("SessionRunnerLLM", () => { const events = yield* EventV2.Service const defect = new Error("fail after prompt promotion") let fail = true - yield* events.project(SessionEvent.PromptLifecycle.Promoted, () => (fail ? Effect.die(defect) : Effect.void)) + yield* events.project(SessionEvent.Prompted, () => (fail ? Effect.die(defect) : Effect.void)) yield* session.prompt({ sessionID, prompt: new Prompt({ text: "Recover promoted input" }), resume: false }) expect(yield* session.resume(sessionID).pipe(Effect.catchDefect(Effect.succeed))).toBe(defect) @@ -2429,9 +2429,7 @@ describe("SessionRunnerLLM", () => { const session = yield* SessionV2.Service const events = yield* EventV2.Service yield* events.listen((event) => - event.type === SessionEvent.PromptLifecycle.Promoted.type - ? Effect.die("fail after prompt promotion commits") - : Effect.void, + event.type === SessionEvent.Prompted.type ? Effect.die("fail after prompt promotion commits") : Effect.void, ) yield* session.prompt({ sessionID, diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index b15ff9347095..f71b010420d8 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -20,7 +20,6 @@ export type Event = | EventSessionNextMoved | EventSessionNextPrompted | EventSessionNextPromptAdmitted - | EventSessionNextPromptPromoted | EventSessionNextContextUpdated | EventSessionNextSynthetic | EventSessionNextShellStarted @@ -864,17 +863,6 @@ export type GlobalEvent = { delivery: "steer" | "queue" } } - | { - id: string - type: "session.next.prompt.promoted" - properties: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } - } | { id: string type: "session.next.context.updated" @@ -1628,7 +1616,6 @@ export type GlobalEvent = { | SyncEventSessionNextMoved | SyncEventSessionNextPrompted | SyncEventSessionNextPromptAdmitted - | SyncEventSessionNextPromptPromoted | SyncEventSessionNextContextUpdated | SyncEventSessionNextSynthetic | SyncEventSessionNextShellStarted @@ -2770,7 +2757,6 @@ export type V2Event = | V2EventSessionNextMoved | V2EventSessionNextPrompted | V2EventSessionNextPromptAdmitted - | V2EventSessionNextPromptPromoted | V2EventSessionNextContextUpdated | V2EventSessionNextSynthetic | V2EventSessionNextShellStarted @@ -3220,24 +3206,6 @@ export type SyncEventSessionNextPromptAdmitted = { } } -export type SyncEventSessionNextPromptPromoted = { - type: "sync" - id: string - syncEvent: { - type: "session.next.prompt.promoted.1" - id: string - seq: number - aggregateID: string - data: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } - } -} - export type SyncEventSessionNextContextUpdated = { type: "sync" id: string @@ -4523,27 +4491,6 @@ export type V2EventSessionNextPromptAdmitted = { } } -export type V2EventSessionNextPromptPromoted = { - id: string - metadata?: { - [key: string]: unknown - } - durable?: { - aggregateID: string - seq: number - version: number - } - location?: LocationRef - type: "session.next.prompt.promoted" - data: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } -} - export type V2EventSessionNextContextUpdated = { id: string metadata?: { @@ -6168,18 +6115,6 @@ export type EventSessionNextPromptAdmitted = { } } -export type EventSessionNextPromptPromoted = { - id: string - type: "session.next.prompt.promoted" - properties: { - timestamp: number - sessionID: string - messageID: string - prompt: Prompt - timeCreated: number - } -} - export type EventSessionNextContextUpdated = { id: string type: "session.next.context.updated" diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 0d8dc1aecfe2..0f8f5ca1980d 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -14686,9 +14686,6 @@ { "$ref": "#/components/schemas/EventSessionNextPromptAdmitted" }, - { - "$ref": "#/components/schemas/EventSessionNextPromptPromoted" - }, { "$ref": "#/components/schemas/EventSessionNextContextUpdated" }, @@ -17255,45 +17252,6 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, - { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, { "type": "object", "properties": { @@ -19832,9 +19790,6 @@ { "$ref": "#/components/schemas/SyncEventSessionNextPromptAdmitted" }, - { - "$ref": "#/components/schemas/SyncEventSessionNextPromptPromoted" - }, { "$ref": "#/components/schemas/SyncEventSessionNextContextUpdated" }, @@ -23073,9 +23028,6 @@ { "$ref": "#/components/schemas/V2EventSessionNextPromptAdmitted" }, - { - "$ref": "#/components/schemas/V2EventSessionNextPromptPromoted" - }, { "$ref": "#/components/schemas/V2EventSessionNextContextUpdated" }, @@ -24400,66 +24352,6 @@ "required": ["type", "id", "syncEvent"], "additionalProperties": false }, - "SyncEventSessionNextPromptPromoted": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["sync"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "syncEvent": { - "type": "object", - "properties": { - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted.1"] - }, - "id": { - "type": "string", - "pattern": "^evt_" - }, - "seq": { - "type": "number" - }, - "aggregateID": { - "type": "string" - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["type", "id", "seq", "aggregateID", "data"], - "additionalProperties": false - } - }, - "required": ["type", "id", "syncEvent"], - "additionalProperties": false - }, "SyncEventSessionNextContextUpdated": { "type": "object", "properties": { @@ -28622,67 +28514,6 @@ "required": ["id", "type", "data"], "additionalProperties": false }, - "V2EventSessionNextPromptPromoted": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "metadata": { - "type": "object" - }, - "durable": { - "type": "object", - "properties": { - "aggregateID": { - "type": "string" - }, - "seq": { - "type": "integer" - }, - "version": { - "type": "integer" - } - }, - "required": ["aggregateID", "seq", "version"], - "additionalProperties": false - }, - "location": { - "$ref": "#/components/schemas/LocationRef" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted"] - }, - "data": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["id", "type", "data"], - "additionalProperties": false - }, "V2EventSessionNextContextUpdated": { "type": "object", "properties": { @@ -33282,45 +33113,6 @@ "required": ["id", "type", "properties"], "additionalProperties": false }, - "EventSessionNextPromptPromoted": { - "type": "object", - "properties": { - "id": { - "type": "string", - "pattern": "^evt_" - }, - "type": { - "type": "string", - "enum": ["session.next.prompt.promoted"] - }, - "properties": { - "type": "object", - "properties": { - "timestamp": { - "type": "number" - }, - "sessionID": { - "type": "string", - "pattern": "^ses" - }, - "messageID": { - "type": "string", - "pattern": "^msg_" - }, - "prompt": { - "$ref": "#/components/schemas/Prompt" - }, - "timeCreated": { - "type": "number" - } - }, - "required": ["timestamp", "sessionID", "messageID", "prompt", "timeCreated"], - "additionalProperties": false - } - }, - "required": ["id", "type", "properties"], - "additionalProperties": false - }, "EventSessionNextContextUpdated": { "type": "object", "properties": { diff --git a/packages/tui/src/context/data.tsx b/packages/tui/src/context/data.tsx index 05cf4afbebc6..9b2e58907ad9 100644 --- a/packages/tui/src/context/data.tsx +++ b/packages/tui/src/context/data.tsx @@ -164,18 +164,6 @@ export const { use: useData, provider: DataProvider } = createSimpleContext({ } case "session.next.prompt.admitted": break - case "session.next.prompt.promoted": - message.update(event.data.sessionID, (draft) => { - message.prepend(draft, { - id: event.data.messageID, - type: "user", - text: event.data.prompt.text, - files: event.data.prompt.files, - agents: event.data.prompt.agents, - time: { created: event.data.timeCreated }, - }) - }) - break case "session.next.context.updated": message.update(event.data.sessionID, (draft) => { message.prepend(draft, { diff --git a/packages/tui/test/cli/tui/data.test.tsx b/packages/tui/test/cli/tui/data.test.tsx index 0d6ada4d161c..279ba4d065c4 100644 --- a/packages/tui/test/cli/tui/data.test.tsx +++ b/packages/tui/test/cli/tui/data.test.tsx @@ -370,7 +370,7 @@ test("settles pending tools when a live failure arrives", async () => { } }) -test("renders admitted prompts only after promotion", async () => { +test("renders admitted prompts only after they become model-visible", async () => { const events = createEventSource() const calls = createFetch(undefined, events) let sync!: ReturnType @@ -413,14 +413,14 @@ test("renders admitted prompts only after promotion", async () => { expect(sync.session.message.list("session-1") ?? []).toEqual([]) emitEvent(events, { - id: "evt_promoted_1", - type: "session.next.prompt.promoted", + id: "evt_prompted_1", + type: "session.next.prompted", properties: { sessionID: "session-1", messageID: "msg_user_1", - timestamp: 1, + timestamp: 0, prompt: { text: "hello" }, - timeCreated: 0, + delivery: "steer", }, }) @@ -434,54 +434,6 @@ test("renders admitted prompts only after promotion", async () => { } }) -test("renders a promoted prompt when admission was missed", async () => { - const events = createEventSource() - const calls = createFetch(undefined, events) - let sync!: ReturnType - let ready!: () => void - const mounted = new Promise((resolve) => { - ready = resolve - }) - - function Probe() { - sync = useData() - onMount(ready) - return - } - - const app = await testRender(() => ( - - - - - - - - - - )) - - try { - await mounted - emitEvent(events, { - id: "evt_promoted_1", - type: "session.next.prompt.promoted", - properties: { - sessionID: "session-1", - messageID: "msg_user_1", - timestamp: 1, - prompt: { text: "hello" }, - timeCreated: 0, - }, - }) - - await wait(() => sync.session.message.list("session-1")?.length === 1) - expect(sync.session.message.list("session-1")?.[0]?.id).toBe("msg_user_1") - } finally { - app.renderer.destroy() - } -}) - test("projects live context updates with their message ID", async () => { const events = createEventSource() const calls = createFetch(undefined, events) diff --git a/specs/v2/schema-changelog.md b/specs/v2/schema-changelog.md index 6d9c0efddbc8..bdd483715627 100644 --- a/specs/v2/schema-changelog.md +++ b/specs/v2/schema-changelog.md @@ -1,5 +1,12 @@ # V2 Schema Changelog +## 2026-06-22: Simplify Session Input Promotion + +- Keep `session.next.prompt.admitted.1` as the durable, client-visible record of pending Session input. +- Replace `session.next.prompt.promoted.1` with the existing `session.next.prompted.1` event when input becomes model-visible. +- Preserve the prompt endpoint, admission receipt, idempotency, steer/queue ordering, and atomic user-message projection. +- Reset experimental V2 events, projections, inputs, Context Epochs, and synchronized workspace state while preserving canonical V1 `session`, `message`, and `part` rows. + ## 2026-06-22: Reset Unpublished Compaction Event - Replace the unpublished `session.next.compaction.ended.1` payload with the current checkpoint payload and remove its legacy decoder. diff --git a/specs/v2/session.md b/specs/v2/session.md index 9946758322ea..6788a893eafa 100644 --- a/specs/v2/session.md +++ b/specs/v2/session.md @@ -12,8 +12,8 @@ sessions.create({ id?, location, ... }) sessions.prompt({ id?, sessionID, prompt, delivery?, resume? }) -> omitted ID generates one internal message ID - -> supplied ID admits one durable Session input when absent - -> exact reuse returns the same admitted lifecycle receipt + -> supplied ID inserts one durable Session inbox row when absent + -> exact reuse returns the same admission receipt -> reusing one message ID for another Session, prompt, or delivery mode fails -> exact retry schedules another wake unless resume is false -> resume omitted or true schedules execution after admission @@ -27,7 +27,9 @@ sessions.interrupt(sessionID) -> idle or missing Session is a no-op ``` -`session_input` is the durable admission inbox. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `PromptLifecycle.Promoted`. The projector atomically writes the visible user message and marks its inbox row promoted in the same event transaction. The legacy V1-to-V2 shadow bridge continues publishing ordinary `Prompted` events for already-visible V1 prompts. +`session_input` is the durable admission inbox. `PromptAdmitted` records and projects accepted input so pending queue state can be replayed, replicated, and observed by clients. Admitted inputs remain outside model-visible Session history until the serialized runner publishes `Prompted`. Its projector atomically writes the visible user message and marks the inbox row promoted in the same event transaction. The V1-to-V2 shadow bridge publishes the same `Prompted` event for already-visible V1 prompts. + +`admittedSeq` is the durable Session event sequence of `PromptAdmitted`. Clients may use the admission event to represent queued input before `Prompted` makes it part of visible conversation history. Execution routing starts from only the Session ID: