From 03c629010341d1fee851cc90acdc75aa3675a87c Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 10 Jun 2026 13:45:38 +0100 Subject: [PATCH 1/4] fix(webapp): harden the realtime session routes Scope session stream waitpoint delivery to the environment so two environments using the same session externalId can never complete each other's waitpoints. Add the missing authorization checks to the session snapshot-url routes and restrict out-channel appends to secret key auth, so a session-scoped token cannot read other sessions' snapshots or forge assistant output. Appends that carry an X-Part-Id header are now deduplicated on retry, session creation rejects expired sessions, externalId is immutable after creation, and the sessions list endpoint returns friendly run ids. --- .server-changes/session-route-hardening.md | 6 + ...uns.$runFriendlyId.session-streams.wait.ts | 11 +- .../app/routes/api.v1.sessions.$session.ts | 18 +++ ...api.v1.sessions.$sessionId.snapshot-url.ts | 36 +++++- apps/webapp/app/routes/api.v1.sessions.ts | 44 +++++-- ...ealtime.v1.sessions.$session.$io.append.ts | 49 +++++++- ...ealtime.v1.sessions.$session.$io.append.ts | 2 +- .../app/services/realtime/sessions.server.ts | 36 +++++- .../sessionStreamWaitpointCache.server.ts | 110 +++++++++++++++--- 9 files changed, 274 insertions(+), 38 deletions(-) create mode 100644 .server-changes/session-route-hardening.md diff --git a/.server-changes/session-route-hardening.md b/.server-changes/session-route-hardening.md new file mode 100644 index 00000000000..2734b35a784 --- /dev/null +++ b/.server-changes/session-route-hardening.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids. diff --git a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts index 4fbdb454d92..39c30894416 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.session-streams.wait.ts @@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute( }); // Step 2: Register the waitpoint on the session channel so the next - // append fires it. Keyed by (addressingKey, io) — the canonical - // string for the row. The append handler drains by the same - // canonical key, so writers and readers converge regardless of - // which URL form the agent vs. the appending caller used. + // append fires it. Keyed by (environmentId, addressingKey, io) — the + // canonical string for the row, scoped to the environment because + // externalIds are only unique per environment. The append handler + // drains by the same key, so writers and readers converge regardless + // of which URL form the agent vs. the appending caller used. const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined; await addSessionStreamWaitpoint( + authentication.environment.id, addressingKey, body.io, result.waitpoint.id, @@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute( }); await removeSessionStreamWaitpoint( + authentication.environment.id, addressingKey, body.io, result.waitpoint.id diff --git a/apps/webapp/app/routes/api.v1.sessions.$session.ts b/apps/webapp/app/routes/api.v1.sessions.$session.ts index 9b6fb339989..58208de35e3 100644 --- a/apps/webapp/app/routes/api.v1.sessions.$session.ts +++ b/apps/webapp/app/routes/api.v1.sessions.$session.ts @@ -74,6 +74,24 @@ const { action } = createActionApiRoute( return json({ error: "Session not found" }, { status: 404 }); } + // The externalId is the canonical addressing key once set: the S2 + // stream names, the waitpoint cache key, and the minted session PAT + // scope all derive from it. Re-keying a session would orphan its + // streams (the chat goes silent) and invalidate the PAT's scope, so + // reject any change. Same-value PATCHes stay idempotent. + if ( + body.externalId !== undefined && + body.externalId !== existing.externalId + ) { + return json( + { + error: + "externalId cannot be changed after creation; close this session and create a new one with the desired externalId", + }, + { status: 422 } + ); + } + try { const updated = await prisma.session.update({ where: { id: existing.id }, diff --git a/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts b/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts index fc0335847bb..80140f05d20 100644 --- a/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts +++ b/apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts @@ -4,6 +4,7 @@ import { $replica } from "~/db.server"; import { chatSnapshotStorageKey } from "~/services/realtime/chatSnapshot.server"; import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server"; import { + anyResource, createActionApiRoute, createLoaderApiRoute, } from "~/services/routeBuilders/apiBuilder.server"; @@ -21,8 +22,31 @@ const routeConfig = { resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId), }; +// Authorize against the union of the URL form, friendlyId, and externalId — +// same shape as the sibling session routes. Without an authorization block +// the route builder skips scope checks entirely, so any session-scoped JWT +// in the environment could presign URLs for any other session's snapshot. +function sessionResource( + paramId: string, + session: { friendlyId: string; externalId: string | null } | null | undefined +) { + const ids = new Set([paramId]); + if (session) { + ids.add(session.friendlyId); + if (session.externalId) ids.add(session.externalId); + } + return anyResource([...ids].map((id) => ({ type: "sessions" as const, id }))); +} + export const { action } = createActionApiRoute( - { ...routeConfig, method: "PUT" }, + { + ...routeConfig, + method: "PUT", + authorization: { + action: "write", + resource: (params, _, __, ___, session) => sessionResource(params.sessionId, session), + }, + }, async ({ authentication, resource: session }) => { if (!session) { return json({ error: "Session not found" }, { status: 404 }); @@ -42,7 +66,15 @@ export const { action } = createActionApiRoute( } ); -export const loader = createLoaderApiRoute(routeConfig, async ({ authentication, resource: session }) => { +export const loader = createLoaderApiRoute( + { + ...routeConfig, + authorization: { + action: "read", + resource: (session, params) => sessionResource(params.sessionId, session), + }, + }, + async ({ authentication, resource: session }) => { if (!session) { return json({ error: "Session not found" }, { status: 404 }); } diff --git a/apps/webapp/app/routes/api.v1.sessions.ts b/apps/webapp/app/routes/api.v1.sessions.ts index 308901b0874..5a3952f8201 100644 --- a/apps/webapp/app/routes/api.v1.sessions.ts +++ b/apps/webapp/app/routes/api.v1.sessions.ts @@ -18,7 +18,10 @@ import { type SessionTriggerConfig, } from "~/services/realtime/sessionRunManager.server"; import { chatSnapshotStoragePathForSession } from "~/services/realtime/chatSnapshot.server"; -import { serializeSession } from "~/services/realtime/sessions.server"; +import { + serializeSession, + serializeSessionsWithFriendlyRunIds, +} from "~/services/realtime/sessions.server"; import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server"; import { anyResource, @@ -91,17 +94,25 @@ export const loader = createLoaderApiRoute( }, }); + // Batched friendlyId translation: `currentRunId` on the wire is the + // public `run_*` form, matching the single-session routes. One `IN` + // lookup per page. + const data = await serializeSessionsWithFriendlyRunIds( + rows.map( + (row) => + ({ + ...row, + // Columns the list query doesn't select — filled so the + // serializer can operate on a narrowed payload without type errors. + projectId: authentication.environment.projectId, + environmentType: authentication.environment.type, + organizationId: authentication.environment.organizationId, + }) as Session + ) + ); + return json({ - data: rows.map((row) => - serializeSession({ - ...row, - // Columns the list query doesn't select — filled so `serializeSession` - // can operate on a narrowed payload without type errors. - projectId: authentication.environment.projectId, - environmentType: authentication.environment.type, - organizationId: authentication.environment.organizationId, - } as Session) - ), + data, pagination: { ...(pagination.nextCursor ? { next: pagination.nextCursor } : {}), ...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}), @@ -225,6 +236,17 @@ const { action } = createActionApiRoute( ); } + // Same guard as the append / end-and-continue handlers: an expired + // row must not spawn a run, because every subsequent `.in/append` + // would 400 on the expiry check — a run boots but the chat can + // never receive input. + if (session.expiresAt && session.expiresAt.getTime() < Date.now()) { + return json( + { error: "Session is expired; use a different externalId to create a new session" }, + { status: 409 } + ); + } + // Session is task-bound — every session has a live run by // construction. `ensureRunForSession` is idempotent: on the // cached path it sees `currentRunId` is alive and returns it diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index dbdb9f47d52..39d3eb65019 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -11,7 +11,11 @@ import { resolveSessionByIdOrExternalId, } from "~/services/realtime/sessions.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; -import { drainSessionStreamWaitpoints } from "~/services/sessionStreamWaitpointCache.server"; +import { + drainSessionStreamWaitpoints, + markSessionStreamPartAppended, + wasSessionStreamPartAppended, +} from "~/services/sessionStreamWaitpointCache.server"; import { anyResource, createActionApiRoute, @@ -91,6 +95,17 @@ const { action, loader } = createActionApiRoute( ); } + // `.out` is the agent→client channel. Only PRIVATE (secret key) auth — + // i.e. the agent run itself — may write to it. Session-scoped JWTs carry + // `write:sessions:` for `.in`; without this gate they could forge + // assistant chunks and complete `.out` waitpoints on their own session. + if (params.io === "out" && authentication.type !== "PRIVATE") { + return json( + { ok: false, error: "Appending to the out channel requires secret key authentication" }, + { status: 403 } + ); + } + const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", { session, }); @@ -132,7 +147,26 @@ const { action, loader } = createActionApiRoute( const addressingKey = canonicalSessionAddressingKey(session, params.session); const part = await request.text(); - const partId = request.headers.get("X-Part-Id") ?? nanoid(7); + const clientPartId = request.headers.get("X-Part-Id"); + const partId = clientPartId ?? nanoid(7); + + // Idempotency on client-supplied part ids: a retried POST whose first + // attempt committed is acknowledged without a second append (which + // would duplicate the record and double-fire the waitpoint drain). + // The marker is only written after a successful append, so retries of + // genuinely failed appends still go through. Server-generated ids are + // per-request and carry no dedupe meaning. + if ( + clientPartId && + (await wasSessionStreamPartAppended( + authentication.environment.id, + addressingKey, + params.io, + clientPartId + )) + ) { + return json({ ok: true }, { status: 200 }); + } const [appendError] = await tryCatch( realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io) @@ -153,6 +187,15 @@ const { action, loader } = createActionApiRoute( return json({ ok: false, error: "Something went wrong, please try again." }, { status: 500 }); } + if (clientPartId) { + await markSessionStreamPartAppended( + authentication.environment.id, + addressingKey, + params.io, + clientPartId + ); + } + // Fire any run-scoped waitpoints registered against this channel. Best // effort — a failure here must not fail the append (the record is // durable in S2; the SSE tail will still deliver it). Waitpoints are @@ -160,7 +203,7 @@ const { action, loader } = createActionApiRoute( // `sessions.open(...).in.wait()`, so writers and readers converge // regardless of which URL form they used. const [drainError, waitpointIds] = await tryCatch( - drainSessionStreamWaitpoints(addressingKey, params.io) + drainSessionStreamWaitpoints(authentication.environment.id, addressingKey, params.io) ); if (drainError) { logger.error("Failed to drain session stream waitpoints", { diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts index 77f03f4ce5c..038e053a8b0 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.playground.realtime.v1.sessions.$session.$io.append.ts @@ -128,7 +128,7 @@ export async function action({ request, params }: ActionFunctionArgs) { // Drain any waitpoints registered for this channel — same as the // public append. Best-effort; failure doesn't fail the append. const [drainError, waitpointIds] = await tryCatch( - drainSessionStreamWaitpoints(addressingKey, io) + drainSessionStreamWaitpoints(environment.id, addressingKey, io) ); if (drainError) { logger.error("Failed to drain session stream waitpoints (playground)", { diff --git a/apps/webapp/app/services/realtime/sessions.server.ts b/apps/webapp/app/services/realtime/sessions.server.ts index 594d417292c..47e0dd247ea 100644 --- a/apps/webapp/app/services/realtime/sessions.server.ts +++ b/apps/webapp/app/services/realtime/sessions.server.ts @@ -75,10 +75,10 @@ export function canonicalSessionAddressingKey( * * Note: `currentRunId` is left as-is — Prisma stores the internal run id * (cuid), but `SessionItem.currentRunId` is the *friendly* form. Routes - * that emit a single `SessionItem` should use - * {@link serializeSessionWithFriendlyRunId} instead, which resolves the - * friendlyId via a TaskRun lookup. List endpoints stay on this raw form - * to avoid N+1 lookups when paginating. + * that emit `SessionItem`s must translate: single-row endpoints via + * {@link serializeSessionWithFriendlyRunId}, list endpoints via the + * batched {@link serializeSessionsWithFriendlyRunIds}. Never put this + * raw form on the wire directly. */ export function serializeSession(session: Session): SessionItem { return { @@ -125,3 +125,31 @@ export async function serializeSessionWithFriendlyRunId( currentRunId: run?.friendlyId ?? null, }; } + +/** + * Batched form of {@link serializeSessionWithFriendlyRunId} for list + * endpoints: one `IN` lookup per page instead of N+1. `currentRunId` on + * the wire is always the public `run_*` friendlyId — the raw + * {@link serializeSession} form leaks the internal cuid, which customers + * can't use with `runs.retrieve(...)`. + */ +export async function serializeSessionsWithFriendlyRunIds( + sessions: Session[] +): Promise { + const runIds = [...new Set(sessions.map((s) => s.currentRunId).filter((id): id is string => !!id))]; + + const runs = runIds.length + ? await $replica.taskRun.findMany({ + where: { id: { in: runIds } }, + select: { id: true, friendlyId: true }, + }) + : []; + const friendlyIdByRunId = new Map(runs.map((run) => [run.id, run.friendlyId])); + + return sessions.map((session) => ({ + ...serializeSession(session), + currentRunId: session.currentRunId + ? friendlyIdByRunId.get(session.currentRunId) ?? null + : null, + })); +} diff --git a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts index 93f4b397481..bea4df4e127 100644 --- a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts @@ -5,14 +5,18 @@ import { singleton } from "~/utils/singleton"; import { logger } from "./logger.server"; // "ssw" — session-stream-waitpoint. Parallel to the input-stream variant -// (`isw:{runFriendlyId}:{streamId}`). Keyed purely on `{sessionId, io}` so -// a send() lands on the channel regardless of which run is waiting, and +// (`isw:{runFriendlyId}:{streamId}`). Keyed on `{environmentId, addressingKey, io}` +// so a send() lands on the channel regardless of which run is waiting, and // multiple concurrent waiters (e.g. two agents on one chat) all wake. +// The environmentId prefix is load-bearing: the addressing key is the +// user-supplied externalId (unique only per environment), and this Redis +// is shared — without it, two environments using the same externalId +// would drain each other's waitpoints. const KEY_PREFIX = "ssw:"; const DEFAULT_TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days -function buildKey(sessionFriendlyId: string, io: "out" | "in"): string { - return `${KEY_PREFIX}${sessionFriendlyId}:${io}`; +function buildKey(environmentId: string, addressingKey: string, io: "out" | "in"): string { + return `${KEY_PREFIX}${environmentId}:${addressingKey}:${io}`; } function initializeRedis(): Redis | undefined { @@ -67,7 +71,8 @@ const ADD_WAITPOINT_SCRIPT = ` * channel are allowed (stored as a Redis set). */ export async function addSessionStreamWaitpoint( - sessionFriendlyId: string, + environmentId: string, + addressingKey: string, io: "out" | "in", waitpointId: string, ttlMs?: number @@ -75,7 +80,7 @@ export async function addSessionStreamWaitpoint( if (!redis) return; try { - const key = buildKey(sessionFriendlyId, io); + const key = buildKey(environmentId, addressingKey, io); await redis.eval( ADD_WAITPOINT_SCRIPT, 1, @@ -85,7 +90,8 @@ export async function addSessionStreamWaitpoint( ); } catch (error) { logger.error("Failed to set session stream waitpoint cache", { - sessionFriendlyId, + environmentId, + addressingKey, io, error, }); @@ -98,13 +104,14 @@ export async function addSessionStreamWaitpoint( * empty set even if two appends race. */ export async function drainSessionStreamWaitpoints( - sessionFriendlyId: string, + environmentId: string, + addressingKey: string, io: "out" | "in" ): Promise { if (!redis) return []; try { - const key = buildKey(sessionFriendlyId, io); + const key = buildKey(environmentId, addressingKey, io); const pipeline = redis.multi(); pipeline.smembers(key); pipeline.del(key); @@ -117,7 +124,8 @@ export async function drainSessionStreamWaitpoints( return Array.isArray(members) ? (members as string[]) : []; } catch (error) { logger.error("Failed to drain session stream waitpoint cache", { - sessionFriendlyId, + environmentId, + addressingKey, io, error, }); @@ -129,19 +137,95 @@ export async function drainSessionStreamWaitpoints( * Remove a single waitpoint from the pending set. Called after a race * where `.wait()` completed the waitpoint from pre-arrived data. */ +// "ssa" — session-stream-append. Best-effort idempotency marker for the +// append route: when a caller supplies an `X-Part-Id`, a retried POST +// whose first attempt actually committed is skipped instead of producing +// a duplicate record (and double-firing the waitpoint drain). The marker +// is only written AFTER a successful S2 append, so a retry of a genuinely +// failed append still goes through. 5-minute window — this covers HTTP +// retry storms, not a permanent idempotency store. +const APPEND_DEDUPE_PREFIX = "ssa:"; +const APPEND_DEDUPE_TTL_SECONDS = 5 * 60; + +function buildAppendDedupeKey( + environmentId: string, + addressingKey: string, + io: "out" | "in", + partId: string +): string { + return `${APPEND_DEDUPE_PREFIX}${environmentId}:${addressingKey}:${io}:${partId}`; +} + +/** + * True if a part with this id was already successfully appended to the + * channel within the dedupe window. Fails open (returns false) when Redis + * is unavailable — appends degrade to at-least-once, never to dropped. + */ +export async function wasSessionStreamPartAppended( + environmentId: string, + addressingKey: string, + io: "out" | "in", + partId: string +): Promise { + if (!redis) return false; + + try { + const value = await redis.get(buildAppendDedupeKey(environmentId, addressingKey, io, partId)); + return value !== null; + } catch (error) { + logger.error("Failed to read session stream append dedupe marker", { + environmentId, + addressingKey, + io, + partId, + error, + }); + return false; + } +} + +/** Record a successful append so a retried POST with the same part id is skipped. */ +export async function markSessionStreamPartAppended( + environmentId: string, + addressingKey: string, + io: "out" | "in", + partId: string +): Promise { + if (!redis) return; + + try { + await redis.set( + buildAppendDedupeKey(environmentId, addressingKey, io, partId), + "1", + "EX", + APPEND_DEDUPE_TTL_SECONDS + ); + } catch (error) { + logger.error("Failed to write session stream append dedupe marker", { + environmentId, + addressingKey, + io, + partId, + error, + }); + } +} + export async function removeSessionStreamWaitpoint( - sessionFriendlyId: string, + environmentId: string, + addressingKey: string, io: "out" | "in", waitpointId: string ): Promise { if (!redis) return; try { - const key = buildKey(sessionFriendlyId, io); + const key = buildKey(environmentId, addressingKey, io); await redis.srem(key, waitpointId); } catch (error) { logger.error("Failed to remove session stream waitpoint cache entry", { - sessionFriendlyId, + environmentId, + addressingKey, io, error, }); From 3fc6d4da292b03eee3a1f7518ef60147c666bf7e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 10 Jun 2026 15:42:54 +0100 Subject: [PATCH 2/4] fix(webapp): dual-read the legacy waitpoint cache key during rollout Drain both the environment-scoped session-stream waitpoint key and the previous unscoped key, so a waitpoint registered by the prior deploy still wakes its run across the deploy boundary. The legacy read can be dropped a release later once no pre-deploy waitpoints remain. --- .../sessionStreamWaitpointCache.server.ts | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts index bea4df4e127..71602a03c46 100644 --- a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts @@ -19,6 +19,12 @@ function buildKey(environmentId: string, addressingKey: string, io: "out" | "in" return `${KEY_PREFIX}${environmentId}:${addressingKey}:${io}`; } +// Pre-env-scoping key format, drained for one release so waitpoints from the +// previous deploy still wake. Removable once this has been live > turn timeout. +function buildLegacyKey(addressingKey: string, io: "out" | "in"): string { + return `${KEY_PREFIX}${addressingKey}:${io}`; +} + function initializeRedis(): Redis | undefined { const host = env.CACHE_REDIS_HOST; if (!host) { @@ -112,16 +118,24 @@ export async function drainSessionStreamWaitpoints( try { const key = buildKey(environmentId, addressingKey, io); + const legacyKey = buildLegacyKey(addressingKey, io); const pipeline = redis.multi(); pipeline.smembers(key); pipeline.del(key); + pipeline.smembers(legacyKey); + pipeline.del(legacyKey); const results = await pipeline.exec(); if (!results) return []; - const [smembersResult] = results; - if (!smembersResult) return []; - const [err, members] = smembersResult; - if (err) return []; - return Array.isArray(members) ? (members as string[]) : []; + // Union members from the env-scoped key and the legacy key (dual-read). + const ids = new Set(); + for (const idx of [0, 2]) { + const entry = results[idx]; + if (!entry) continue; + const [err, members] = entry; + if (err || !Array.isArray(members)) continue; + for (const m of members as string[]) ids.add(m); + } + return [...ids]; } catch (error) { logger.error("Failed to drain session stream waitpoint cache", { environmentId, From 22680b7de68300e95f832a3e32bcedc0a643114e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 10 Jun 2026 16:40:45 +0100 Subject: [PATCH 3/4] fix(webapp): address review feedback on session route hardening Scope the batched session run-id lookup to the caller environment and project so a stale currentRunId pointer cannot resolve a run in another tenant. Escape the user-supplied segments of the append idempotency key so a colon in an externalId or X-Part-Id cannot collide and falsely suppress an append. Keep the waitpoint drain running on an idempotent retry: a duplicate append is skipped but still drains, so a retry whose first attempt died before waking the waitpoint can still recover it. --- apps/webapp/app/routes/api.v1.sessions.ts | 6 +- ...ealtime.v1.sessions.$session.$io.append.ts | 64 +++++++++---------- .../app/services/realtime/sessions.server.ts | 11 +++- .../sessionStreamWaitpointCache.server.ts | 7 +- 4 files changed, 52 insertions(+), 36 deletions(-) diff --git a/apps/webapp/app/routes/api.v1.sessions.ts b/apps/webapp/app/routes/api.v1.sessions.ts index 5a3952f8201..44f8c7ef69f 100644 --- a/apps/webapp/app/routes/api.v1.sessions.ts +++ b/apps/webapp/app/routes/api.v1.sessions.ts @@ -108,7 +108,11 @@ export const loader = createLoaderApiRoute( environmentType: authentication.environment.type, organizationId: authentication.environment.organizationId, }) as Session - ) + ), + { + projectId: authentication.environment.projectId, + runtimeEnvironmentId: authentication.environment.id, + } ); return json({ diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index 39d3eb65019..b133c666404 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -151,49 +151,49 @@ const { action, loader } = createActionApiRoute( const partId = clientPartId ?? nanoid(7); // Idempotency on client-supplied part ids: a retried POST whose first - // attempt committed is acknowledged without a second append (which - // would duplicate the record and double-fire the waitpoint drain). - // The marker is only written after a successful append, so retries of - // genuinely failed appends still go through. Server-generated ids are - // per-request and carry no dedupe meaning. - if ( - clientPartId && + // attempt committed skips the second append (which would duplicate the + // record), but still falls through to the drain below — so a retry whose + // first attempt died before waking the waitpoint can still recover it. + const alreadyAppended = + !!clientPartId && (await wasSessionStreamPartAppended( authentication.environment.id, addressingKey, params.io, clientPartId - )) - ) { - return json({ ok: true }, { status: 200 }); - } + )); - const [appendError] = await tryCatch( - realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io) - ); + if (!alreadyAppended) { + const [appendError] = await tryCatch( + realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io) + ); - if (appendError) { - if (appendError instanceof ServiceValidationError) { + if (appendError) { + if (appendError instanceof ServiceValidationError) { + return json( + { ok: false, error: appendError.message }, + { status: appendError.status ?? 422 } + ); + } + logger.error("Failed to append to session stream", { + sessionId: session.id, + io: params.io, + error: appendError, + }); return json( - { ok: false, error: appendError.message }, - { status: appendError.status ?? 422 } + { ok: false, error: "Something went wrong, please try again." }, + { status: 500 } ); } - logger.error("Failed to append to session stream", { - sessionId: session.id, - io: params.io, - error: appendError, - }); - return json({ ok: false, error: "Something went wrong, please try again." }, { status: 500 }); - } - if (clientPartId) { - await markSessionStreamPartAppended( - authentication.environment.id, - addressingKey, - params.io, - clientPartId - ); + if (clientPartId) { + await markSessionStreamPartAppended( + authentication.environment.id, + addressingKey, + params.io, + clientPartId + ); + } } // Fire any run-scoped waitpoints registered against this channel. Best diff --git a/apps/webapp/app/services/realtime/sessions.server.ts b/apps/webapp/app/services/realtime/sessions.server.ts index 47e0dd247ea..226ec443d4c 100644 --- a/apps/webapp/app/services/realtime/sessions.server.ts +++ b/apps/webapp/app/services/realtime/sessions.server.ts @@ -134,13 +134,20 @@ export async function serializeSessionWithFriendlyRunId( * can't use with `runs.retrieve(...)`. */ export async function serializeSessionsWithFriendlyRunIds( - sessions: Session[] + sessions: Session[], + scope: { projectId: string; runtimeEnvironmentId: string } ): Promise { const runIds = [...new Set(sessions.map((s) => s.currentRunId).filter((id): id is string => !!id))]; + // `currentRunId` is a plain string pointer (no FK), so scope the lookup to + // the caller's tenant — a stale value must not resolve a run in another env. const runs = runIds.length ? await $replica.taskRun.findMany({ - where: { id: { in: runIds } }, + where: { + id: { in: runIds }, + projectId: scope.projectId, + runtimeEnvironmentId: scope.runtimeEnvironmentId, + }, select: { id: true, friendlyId: true }, }) : []; diff --git a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts index 71602a03c46..3d396041b35 100644 --- a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts @@ -167,7 +167,12 @@ function buildAppendDedupeKey( io: "out" | "in", partId: string ): string { - return `${APPEND_DEDUPE_PREFIX}${environmentId}:${addressingKey}:${io}:${partId}`; + // Encode the free-form segments — `addressingKey` (externalId) and `partId` + // (X-Part-Id) are user-supplied and may contain `:`, which would otherwise + // let different tuples collide and falsely suppress an append. + return `${APPEND_DEDUPE_PREFIX}${encodeURIComponent(environmentId)}:${encodeURIComponent( + addressingKey + )}:${io}:${encodeURIComponent(partId)}`; } /** From 4593ef81ae5e1beae84f5a7f5091bae84aac92e7 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 10 Jun 2026 17:20:45 +0100 Subject: [PATCH 4/4] fix(webapp): make session append idempotency atomic Claim the part id with SET NX before appending instead of a read then write, so two concurrent or retried POSTs with the same X-Part-Id can never both write a record. The claim is released when the append fails so a genuine retry still proceeds. --- ...ealtime.v1.sessions.$session.$io.append.ts | 49 ++++++++-------- .../sessionStreamWaitpointCache.server.ts | 56 ++++++++++--------- 2 files changed, 56 insertions(+), 49 deletions(-) diff --git a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts index b133c666404..e5fc0d5c344 100644 --- a/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts +++ b/apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts @@ -12,9 +12,9 @@ import { } from "~/services/realtime/sessions.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { + claimSessionStreamPart, drainSessionStreamWaitpoints, - markSessionStreamPartAppended, - wasSessionStreamPartAppended, + releaseSessionStreamPart, } from "~/services/sessionStreamWaitpointCache.server"; import { anyResource, @@ -150,25 +150,35 @@ const { action, loader } = createActionApiRoute( const clientPartId = request.headers.get("X-Part-Id"); const partId = clientPartId ?? nanoid(7); - // Idempotency on client-supplied part ids: a retried POST whose first - // attempt committed skips the second append (which would duplicate the - // record), but still falls through to the drain below — so a retry whose - // first attempt died before waking the waitpoint can still recover it. - const alreadyAppended = - !!clientPartId && - (await wasSessionStreamPartAppended( - authentication.environment.id, - addressingKey, - params.io, - clientPartId - )); + // Idempotency on client-supplied part ids: atomically claim the id before + // appending. A concurrent or retried POST that loses the claim skips the + // append (no duplicate record) but still falls through to the drain below, + // so a retry whose first attempt died before waking the waitpoint can still + // recover it. The claim is released on append failure so a genuine retry + // can re-claim and proceed. + const wonClaim = clientPartId + ? await claimSessionStreamPart( + authentication.environment.id, + addressingKey, + params.io, + clientPartId + ) + : true; - if (!alreadyAppended) { + if (wonClaim) { const [appendError] = await tryCatch( realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io) ); if (appendError) { + if (clientPartId) { + await releaseSessionStreamPart( + authentication.environment.id, + addressingKey, + params.io, + clientPartId + ); + } if (appendError instanceof ServiceValidationError) { return json( { ok: false, error: appendError.message }, @@ -185,15 +195,6 @@ const { action, loader } = createActionApiRoute( { status: 500 } ); } - - if (clientPartId) { - await markSessionStreamPartAppended( - authentication.environment.id, - addressingKey, - params.io, - clientPartId - ); - } } // Fire any run-scoped waitpoints registered against this channel. Best diff --git a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts index 3d396041b35..31121678282 100644 --- a/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts +++ b/apps/webapp/app/services/sessionStreamWaitpointCache.server.ts @@ -151,13 +151,14 @@ export async function drainSessionStreamWaitpoints( * Remove a single waitpoint from the pending set. Called after a race * where `.wait()` completed the waitpoint from pre-arrived data. */ -// "ssa" — session-stream-append. Best-effort idempotency marker for the -// append route: when a caller supplies an `X-Part-Id`, a retried POST -// whose first attempt actually committed is skipped instead of producing -// a duplicate record (and double-firing the waitpoint drain). The marker -// is only written AFTER a successful S2 append, so a retry of a genuinely -// failed append still goes through. 5-minute window — this covers HTTP -// retry storms, not a permanent idempotency store. +// "ssa" — session-stream-append. Idempotency claim for the append route: +// when a caller supplies an `X-Part-Id`, the first request atomically claims +// the key (SET NX) before appending; a concurrent or retried POST with the +// same id fails the claim and skips the append, so it never produces a +// duplicate record (or double-fires the waitpoint drain). The claim is +// released if the append fails, so a retry of a genuinely failed append +// still goes through. 5-minute window — covers retry storms, not a +// permanent idempotency store. const APPEND_DEDUPE_PREFIX = "ssa:"; const APPEND_DEDUPE_TTL_SECONDS = 5 * 60; @@ -176,35 +177,45 @@ function buildAppendDedupeKey( } /** - * True if a part with this id was already successfully appended to the - * channel within the dedupe window. Fails open (returns false) when Redis - * is unavailable — appends degrade to at-least-once, never to dropped. + * Atomically claim a part id before appending. Returns true if this caller + * won the claim (first to see this id) and should perform the append, false + * if the id was already claimed (a concurrent or retried POST) and the append + * should be skipped. Fails open (returns true) when Redis is unavailable — + * appends degrade to at-least-once, never to dropped. */ -export async function wasSessionStreamPartAppended( +export async function claimSessionStreamPart( environmentId: string, addressingKey: string, io: "out" | "in", partId: string ): Promise { - if (!redis) return false; + if (!redis) return true; try { - const value = await redis.get(buildAppendDedupeKey(environmentId, addressingKey, io, partId)); - return value !== null; + // SET NX is the atomic claim: "OK" when set (we won), null when the key + // already exists (someone else owns this id). + const result = await redis.set( + buildAppendDedupeKey(environmentId, addressingKey, io, partId), + "1", + "EX", + APPEND_DEDUPE_TTL_SECONDS, + "NX" + ); + return result === "OK"; } catch (error) { - logger.error("Failed to read session stream append dedupe marker", { + logger.error("Failed to claim session stream append part", { environmentId, addressingKey, io, partId, error, }); - return false; + return true; } } -/** Record a successful append so a retried POST with the same part id is skipped. */ -export async function markSessionStreamPartAppended( +/** Release a claim so a retry can proceed — called when the append itself failed. */ +export async function releaseSessionStreamPart( environmentId: string, addressingKey: string, io: "out" | "in", @@ -213,14 +224,9 @@ export async function markSessionStreamPartAppended( if (!redis) return; try { - await redis.set( - buildAppendDedupeKey(environmentId, addressingKey, io, partId), - "1", - "EX", - APPEND_DEDUPE_TTL_SECONDS - ); + await redis.del(buildAppendDedupeKey(environmentId, addressingKey, io, partId)); } catch (error) { - logger.error("Failed to write session stream append dedupe marker", { + logger.error("Failed to release session stream append part", { environmentId, addressingKey, io,