Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/src/database/migration.gen.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Effect } from "effect"
import type { DatabaseMigration } from "../migration"

export default {
id: "20260622202450_simplify_session_input",
up(tx) {
return Effect.gen(function* () {
yield* tx.run(`DELETE FROM \`session_context_epoch\`;`)
yield* tx.run(`DELETE FROM \`session_input\`;`)
yield* tx.run(`DELETE FROM \`session_message\`;`)
yield* tx.run(`DELETE FROM \`event\`;`)
yield* tx.run(`DELETE FROM \`event_sequence\`;`)
yield* tx.run(`UPDATE \`session\` SET \`workspace_id\` = NULL WHERE \`workspace_id\` IS NOT NULL;`)
yield* tx.run(`DELETE FROM \`workspace\`;`)
})
},
} satisfies DatabaseMigration.Migration
13 changes: 13 additions & 0 deletions packages/core/src/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ export type Payload<D extends Definition = Definition> = {
export type Subscriber<D extends Definition = Definition> = (event: Payload<D>) => Effect.Effect<void>
export type Unsubscribe = Effect.Effect<void>

export const latestSequence = Effect.fn("EventV2.latestSequence")(function* (
db: Database.Interface["db"],
aggregateID: string,
) {
const row = yield* db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, aggregateID))
.get()
.pipe(Effect.orDie)
return row?.seq ?? -1
})

export type SerializedEvent = {
readonly id: ID
readonly type: string
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/session/context-epoch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const prepareOnce = Effect.fnUntraced(function* (
return { baseline: stored.baseline, baselineSeq: stored.baseline_seq }
}
if (result._tag === "ReplacementReady") {
const baselineSeq = replacementSeq ?? (yield* SessionInput.latestSeq(db, sessionID))
const baselineSeq = replacementSeq ?? (yield* EventV2.latestSequence(db, sessionID))
yield* replace(db, sessionID, baselineSeq, result.generation)
return { baseline: result.generation.baseline, baselineSeq }
}
Expand Down Expand Up @@ -124,7 +124,7 @@ const insert = Effect.fnUntraced(function* (
sessionID: SessionSchema.ID,
generation: SystemContext.Generation,
) {
const baselineSeq = yield* SessionInput.latestSeq(db, sessionID)
const baselineSeq = yield* EventV2.latestSequence(db, sessionID)
yield* db
.insert(SessionContextEpochTable)
.values({
Expand Down
47 changes: 14 additions & 33 deletions packages/core/src/session/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ const Base = {
timestamp: V2Schema.DateTimeUtcFromMillis,
sessionID: SessionSchema.ID,
}
const PromptFields = {
...Base,
messageID: SessionMessageID.ID,
prompt: Prompt,
delivery: Schema.Literals(["steer", "queue"]),
}

const options = {
durable: {
Expand Down Expand Up @@ -83,40 +89,16 @@ export type Moved = typeof Moved.Type
export const Prompted = EventV2.define({
type: "session.next.prompted",
...options,
schema: {
...Base,
messageID: SessionMessageID.ID,
prompt: Prompt,
delivery: Schema.Literals(["steer", "queue"]),
},
schema: PromptFields,
})
export type Prompted = typeof Prompted.Type

export namespace PromptLifecycle {
export const Admitted = EventV2.define({
type: "session.next.prompt.admitted",
...options,
schema: {
...Base,
messageID: SessionMessageID.ID,
prompt: Prompt,
delivery: Schema.Literals(["steer", "queue"]),
},
})
export type Admitted = typeof Admitted.Type

export const Promoted = EventV2.define({
type: "session.next.prompt.promoted",
...options,
schema: {
...Base,
messageID: SessionMessageID.ID,
prompt: Prompt,
timeCreated: V2Schema.DateTimeUtcFromMillis,
},
})
export type Promoted = typeof Promoted.Type
}
export const PromptAdmitted = EventV2.define({
type: "session.next.prompt.admitted",
...options,
schema: PromptFields,
})
export type PromptAdmitted = typeof PromptAdmitted.Type

export const ContextUpdated = EventV2.define({
type: "session.next.context.updated",
Expand Down Expand Up @@ -455,8 +437,7 @@ const DurableDefinitions = [
ModelSwitched,
Moved,
Prompted,
PromptLifecycle.Admitted,
PromptLifecycle.Promoted,
PromptAdmitted,
ContextUpdated,
Synthetic,
Shell.Started,
Expand Down
111 changes: 48 additions & 63 deletions packages/core/src/session/input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { and, asc, eq, isNull, lte } from "drizzle-orm"
import { DateTime, Effect, Schema } from "effect"
import type { Database } from "../database/database"
import type { EventV2 } from "../event"
import { EventSequenceTable } from "../event/sql"
import { NonNegativeInt } from "../schema"
import { V2Schema } from "../v2-schema"
import { SessionEvent } from "./event"
Expand Down Expand Up @@ -65,7 +64,7 @@ export const admit = Effect.fn("SessionInput.admit")(function* (
if (existing !== undefined) return existing
const timestamp = yield* DateTime.now
return yield* events
.publish(SessionEvent.PromptLifecycle.Admitted, {
.publish(SessionEvent.PromptAdmitted, {
messageID: input.id,
sessionID: input.sessionID,
timestamp,
Expand Down Expand Up @@ -93,19 +92,6 @@ export const admit = Effect.fn("SessionInput.admit")(function* (
)
})

export const latestSeq = Effect.fn("SessionInput.latestSeq")(function* (
db: DatabaseService,
sessionID: SessionSchema.ID,
) {
const row = yield* db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, sessionID))
.get()
.pipe(Effect.orDie)
return row?.seq ?? -1
})

export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(function* (
db: DatabaseService,
input: {
Expand All @@ -117,6 +103,13 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio
readonly timeCreated: DateTime.Utc
},
) {
const message = yield* db
.select({ id: SessionMessageTable.id })
.from(SessionMessageTable)
.where(eq(SessionMessageTable.id, input.id))
.get()
.pipe(Effect.orDie)
if (message !== undefined) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
const stored = yield* db
.insert(SessionInputTable)
.values({
Expand All @@ -134,12 +127,13 @@ export const projectAdmitted = Effect.fn("SessionInput.projectAdmitted")(functio
if (!stored) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
})

export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(function* (
export const projectPrompted = Effect.fn("SessionInput.projectPrompted")(function* (
db: DatabaseService,
input: {
readonly id: SessionMessage.ID
readonly sessionID: SessionSchema.ID
readonly prompt: Prompt
readonly delivery: Delivery
readonly timeCreated: DateTime.Utc
readonly promotedSeq: number
},
Expand All @@ -157,14 +151,32 @@ export const projectPromoted = Effect.fn("SessionInput.projectPromoted")(functio
.returning()
.get()
.pipe(Effect.orDie)
if (!updated) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
const stored = fromRow(updated)
if (
!matchesPrompt(stored, input) ||
DateTime.toEpochMillis(stored.timeCreated) !== DateTime.toEpochMillis(input.timeCreated)
)
return yield* Effect.die(new LifecycleConflict({ id: input.id }))
return toMessage(stored)
if (updated) {
const stored = fromRow(updated)
if (!matchesProjection(stored, input)) return yield* Effect.die(new LifecycleConflict({ id: input.id }))
return
}

const stored = yield* find(db, input.id)
if (stored) {
if (!matchesProjection(stored, input) || stored.promotedSeq !== input.promotedSeq)
return yield* Effect.die(new LifecycleConflict({ id: input.id }))
return
}

yield* db
.insert(SessionInputTable)
.values({
id: input.id,
session_id: input.sessionID,
prompt: encodePrompt(input.prompt),
delivery: input.delivery,
admitted_seq: input.promotedSeq,
promoted_seq: input.promotedSeq,
time_created: DateTime.toEpochMillis(input.timeCreated),
})
.run()
.pipe(Effect.orDie)
})

export const hasPending = Effect.fn("SessionInput.hasPending")(function* (
Expand Down Expand Up @@ -201,35 +213,17 @@ const matchesPrompt = (input: Admitted, expected: { readonly sessionID: SessionS
input.sessionID === expected.sessionID &&
JSON.stringify(encodePrompt(input.prompt)) === JSON.stringify(encodePrompt(expected.prompt))

export const projectLegacyPrompted = Effect.fn("SessionInput.projectLegacyPrompted")(function* (
db: DatabaseService,
input: {
readonly id: SessionMessage.ID
const matchesProjection = (
input: Admitted,
expected: {
readonly sessionID: SessionSchema.ID
readonly prompt: Prompt
readonly delivery: Delivery
readonly timeCreated: DateTime.Utc
readonly promotedSeq: number
},
) {
const inserted = yield* db
.insert(SessionInputTable)
.values({
id: input.id,
session_id: input.sessionID,
admitted_seq: input.promotedSeq,
prompt: encodePrompt(input.prompt),
delivery: input.delivery,
promoted_seq: input.promotedSeq,
time_created: DateTime.toEpochMillis(input.timeCreated),
})
.onConflictDoNothing()
.returning()
.get()
.pipe(Effect.orDie)
if (!inserted) return yield* Effect.die("Prompt projection conflicts with admitted input")
return fromRow(inserted)
})
) =>
equivalent(input, expected) &&
DateTime.toEpochMillis(input.timeCreated) === DateTime.toEpochMillis(expected.timeCreated)

const publish = Effect.fn("SessionInput.publish")(function* (
db: DatabaseService,
Expand All @@ -238,18 +232,19 @@ const publish = Effect.fn("SessionInput.publish")(function* (
rows: ReadonlyArray<typeof SessionInputTable.$inferSelect>,
) {
for (const row of rows) {
const id = SessionMessage.ID.make(row.id)
yield* events
.publish(SessionEvent.PromptLifecycle.Promoted, {
.publish(SessionEvent.Prompted, {
sessionID,
timestamp: yield* DateTime.now,
messageID: SessionMessage.ID.make(row.id),
timestamp: DateTime.makeUnsafe(row.time_created),
messageID: id,
prompt: decodePrompt(row.prompt),
timeCreated: DateTime.makeUnsafe(row.time_created),
delivery: row.delivery,
})
.pipe(
Effect.catchDefect((defect) =>
defect instanceof LifecycleConflict
? find(db, SessionMessage.ID.make(row.id)).pipe(
? find(db, id).pipe(
Effect.flatMap((stored) => (stored?.promotedSeq === undefined ? Effect.die(defect) : Effect.void)),
)
: Effect.die(defect),
Expand Down Expand Up @@ -303,13 +298,3 @@ export const promoteNextQueued = Effect.fn("SessionInput.promoteNextQueued")(fun
.pipe(Effect.orDie)
return row === undefined ? false : yield* publish(db, events, sessionID, [row]).pipe(Effect.as(true))
})

const toMessage = (input: Admitted) =>
new SessionMessage.User({
id: input.id,
type: "user",
text: input.prompt.text,
files: input.prompt.files,
agents: input.prompt.agents,
time: { created: input.timeCreated },
})
1 change: 0 additions & 1 deletion packages/core/src/session/message-updater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ export function update(adapter: Adapter, event: SessionEvent.Event) {
)
},
"session.next.prompt.admitted": () => Effect.void,
"session.next.prompt.promoted": () => Effect.void,
"session.next.context.updated": (event) =>
adapter.appendMessage(
new SessionMessage.System({
Expand Down
33 changes: 4 additions & 29 deletions packages/core/src/session/projector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type DatabaseService = Database.Interface["db"]
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
const encodeMessage = Schema.encodeSync(SessionMessage.Message)

class PromptAlreadyProjected extends Error {}
export class SessionAlreadyProjected extends Error {}

type Usage = {
Expand Down Expand Up @@ -350,27 +349,19 @@ export const layer = Layer.effectDiscard(
)
yield* events.project(SessionEvent.Prompted, (event) =>
Effect.gen(function* () {
const messageID = event.data.messageID
const existing = yield* db
.select({ id: SessionMessageTable.id })
.from(SessionMessageTable)
.where(eq(SessionMessageTable.id, messageID))
.get()
.pipe(Effect.orDie)
if (existing) return yield* Effect.die(new PromptAlreadyProjected())
yield* run(db, event)
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
yield* SessionInput.projectLegacyPrompted(db, {
id: messageID,
yield* SessionInput.projectPrompted(db, {
id: event.data.messageID,
sessionID: event.data.sessionID,
prompt: event.data.prompt,
delivery: event.data.delivery,
timeCreated: event.data.timestamp,
promotedSeq: event.durable.seq,
})
yield* run(db, event)
}),
)
yield* events.project(SessionEvent.PromptLifecycle.Admitted, (event) =>
yield* events.project(SessionEvent.PromptAdmitted, (event) =>
Effect.gen(function* () {
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
yield* SessionInput.projectAdmitted(db, {
Expand All @@ -383,22 +374,6 @@ export const layer = Layer.effectDiscard(
})
}),
)
yield* events.project(SessionEvent.PromptLifecycle.Promoted, (event) =>
Effect.gen(function* () {
if (event.durable === undefined) return yield* Effect.die("Durable Session event is missing aggregate sequence")
yield* insertMessage(
db,
event,
yield* SessionInput.projectPromoted(db, {
id: event.data.messageID,
sessionID: event.data.sessionID,
prompt: event.data.prompt,
timeCreated: event.data.timeCreated,
promotedSeq: event.durable.seq,
}),
)
}),
)
yield* events.project(SessionEvent.ContextUpdated, (event) => run(db, event))
yield* events.project(SessionEvent.Synthetic, (event) => run(db, event))
yield* events.project(SessionEvent.Shell.Started, (event) => run(db, event))
Expand Down
Loading
Loading