Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/session-route-hardening.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Hardening fixes for realtime sessions: stricter authorization on snapshot URLs and out-channel appends, environment-scoped message delivery for waiting runs, and idempotent appends via the X-Part-Id header. Session creation now rejects expired sessions, externalId can no longer be changed after creation, and the sessions list returns friendly run ids.
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ const { action, loader } = createActionApiRoute(
});

// Step 2: Register the waitpoint on the session channel so the next
// append fires it. Keyed by (addressingKey, io) — the canonical
// string for the row. The append handler drains by the same
// canonical key, so writers and readers converge regardless of
// which URL form the agent vs. the appending caller used.
// append fires it. Keyed by (environmentId, addressingKey, io) — the
// canonical string for the row, scoped to the environment because
// externalIds are only unique per environment. The append handler
// drains by the same key, so writers and readers converge regardless
// of which URL form the agent vs. the appending caller used.
const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined;
await addSessionStreamWaitpoint(
authentication.environment.id,
addressingKey,
body.io,
result.waitpoint.id,
Expand Down Expand Up @@ -152,6 +154,7 @@ const { action, loader } = createActionApiRoute(
});

await removeSessionStreamWaitpoint(
authentication.environment.id,
addressingKey,
body.io,
result.waitpoint.id
Expand Down
18 changes: 18 additions & 0 deletions apps/webapp/app/routes/api.v1.sessions.$session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ const { action } = createActionApiRoute(
return json({ error: "Session not found" }, { status: 404 });
}

// The externalId is the canonical addressing key once set: the S2
// stream names, the waitpoint cache key, and the minted session PAT
// scope all derive from it. Re-keying a session would orphan its
// streams (the chat goes silent) and invalidate the PAT's scope, so
// reject any change. Same-value PATCHes stay idempotent.
if (
body.externalId !== undefined &&
body.externalId !== existing.externalId
) {
return json(
{
error:
"externalId cannot be changed after creation; close this session and create a new one with the desired externalId",
},
{ status: 422 }
);
}
Comment thread
ericallam marked this conversation as resolved.

try {
const updated = await prisma.session.update({
where: { id: existing.id },
Expand Down
36 changes: 34 additions & 2 deletions apps/webapp/app/routes/api.v1.sessions.$sessionId.snapshot-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { $replica } from "~/db.server";
import { chatSnapshotStorageKey } from "~/services/realtime/chatSnapshot.server";
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
import {
anyResource,
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
Expand All @@ -21,8 +22,31 @@ const routeConfig = {
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId),
};

// Authorize against the union of the URL form, friendlyId, and externalId —
// same shape as the sibling session routes. Without an authorization block
// the route builder skips scope checks entirely, so any session-scoped JWT
// in the environment could presign URLs for any other session's snapshot.
function sessionResource(
paramId: string,
session: { friendlyId: string; externalId: string | null } | null | undefined
) {
const ids = new Set<string>([paramId]);
if (session) {
ids.add(session.friendlyId);
if (session.externalId) ids.add(session.externalId);
}
return anyResource([...ids].map((id) => ({ type: "sessions" as const, id })));
}

export const { action } = createActionApiRoute(
{ ...routeConfig, method: "PUT" },
{
...routeConfig,
method: "PUT",
authorization: {
action: "write",
resource: (params, _, __, ___, session) => sessionResource(params.sessionId, session),
},
},
async ({ authentication, resource: session }) => {
if (!session) {
return json({ error: "Session not found" }, { status: 404 });
Expand All @@ -42,7 +66,15 @@ export const { action } = createActionApiRoute(
}
);

export const loader = createLoaderApiRoute(routeConfig, async ({ authentication, resource: session }) => {
export const loader = createLoaderApiRoute(
{
...routeConfig,
authorization: {
action: "read",
resource: (session, params) => sessionResource(params.sessionId, session),
},
Comment thread
ericallam marked this conversation as resolved.
},
async ({ authentication, resource: session }) => {
if (!session) {
return json({ error: "Session not found" }, { status: 404 });
}
Expand Down
48 changes: 37 additions & 11 deletions apps/webapp/app/routes/api.v1.sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import {
type SessionTriggerConfig,
} from "~/services/realtime/sessionRunManager.server";
import { chatSnapshotStoragePathForSession } from "~/services/realtime/chatSnapshot.server";
import { serializeSession } from "~/services/realtime/sessions.server";
import {
serializeSession,
serializeSessionsWithFriendlyRunIds,
} from "~/services/realtime/sessions.server";
import { SessionsRepository } from "~/services/sessionsRepository/sessionsRepository.server";
import {
anyResource,
Expand Down Expand Up @@ -91,17 +94,29 @@ export const loader = createLoaderApiRoute(
},
});

return json<ListSessionsResponseBody>({
data: rows.map((row) =>
serializeSession({
...row,
// Columns the list query doesn't select — filled so `serializeSession`
// can operate on a narrowed payload without type errors.
projectId: authentication.environment.projectId,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
} as Session)
// Batched friendlyId translation: `currentRunId` on the wire is the
// public `run_*` form, matching the single-session routes. One `IN`
// lookup per page.
const data = await serializeSessionsWithFriendlyRunIds(
rows.map(
(row) =>
({
...row,
// Columns the list query doesn't select — filled so the
// serializer can operate on a narrowed payload without type errors.
projectId: authentication.environment.projectId,
environmentType: authentication.environment.type,
organizationId: authentication.environment.organizationId,
}) as Session
),
{
projectId: authentication.environment.projectId,
runtimeEnvironmentId: authentication.environment.id,
}
);

return json<ListSessionsResponseBody>({
data,
pagination: {
...(pagination.nextCursor ? { next: pagination.nextCursor } : {}),
...(pagination.previousCursor ? { previous: pagination.previousCursor } : {}),
Expand Down Expand Up @@ -225,6 +240,17 @@ const { action } = createActionApiRoute(
);
}

// Same guard as the append / end-and-continue handlers: an expired
// row must not spawn a run, because every subsequent `.in/append`
// would 400 on the expiry check — a run boots but the chat can
// never receive input.
if (session.expiresAt && session.expiresAt.getTime() < Date.now()) {
return json(
{ error: "Session is expired; use a different externalId to create a new session" },
{ status: 409 }
);
}

// Session is task-bound — every session has a live run by
// construction. `ensureRunForSession` is idempotent: on the
// cached path it sees `currentRunId` is alive and returns it
Expand Down
76 changes: 60 additions & 16 deletions apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import {
resolveSessionByIdOrExternalId,
} from "~/services/realtime/sessions.server";
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
import { drainSessionStreamWaitpoints } from "~/services/sessionStreamWaitpointCache.server";
import {
claimSessionStreamPart,
drainSessionStreamWaitpoints,
releaseSessionStreamPart,
} from "~/services/sessionStreamWaitpointCache.server";
import {
anyResource,
createActionApiRoute,
Expand Down Expand Up @@ -91,6 +95,17 @@ const { action, loader } = createActionApiRoute(
);
}

// `.out` is the agent→client channel. Only PRIVATE (secret key) auth —
// i.e. the agent run itself — may write to it. Session-scoped JWTs carry
// `write:sessions:<key>` for `.in`; without this gate they could forge
// assistant chunks and complete `.out` waitpoints on their own session.
if (params.io === "out" && authentication.type !== "PRIVATE") {
return json(
{ ok: false, error: "Appending to the out channel requires secret key authentication" },
{ status: 403 }
);
}

const realtimeStream = getRealtimeStreamInstance(authentication.environment, "v2", {
session,
});
Expand Down Expand Up @@ -132,25 +147,54 @@ const { action, loader } = createActionApiRoute(
const addressingKey = canonicalSessionAddressingKey(session, params.session);

const part = await request.text();
const partId = request.headers.get("X-Part-Id") ?? nanoid(7);
const clientPartId = request.headers.get("X-Part-Id");
const partId = clientPartId ?? nanoid(7);

const [appendError] = await tryCatch(
realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io)
);
// Idempotency on client-supplied part ids: atomically claim the id before
// appending. A concurrent or retried POST that loses the claim skips the
// append (no duplicate record) but still falls through to the drain below,
// so a retry whose first attempt died before waking the waitpoint can still
// recover it. The claim is released on append failure so a genuine retry
// can re-claim and proceed.
const wonClaim = clientPartId
? await claimSessionStreamPart(
authentication.environment.id,
addressingKey,
params.io,
clientPartId
)
: true;

if (wonClaim) {
const [appendError] = await tryCatch(
realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io)
);

if (appendError) {
if (appendError instanceof ServiceValidationError) {
if (appendError) {
if (clientPartId) {
await releaseSessionStreamPart(
authentication.environment.id,
addressingKey,
params.io,
clientPartId
);
}
if (appendError instanceof ServiceValidationError) {
return json(
{ ok: false, error: appendError.message },
{ status: appendError.status ?? 422 }
);
}
logger.error("Failed to append to session stream", {
sessionId: session.id,
io: params.io,
error: appendError,
});
return json(
{ ok: false, error: appendError.message },
{ status: appendError.status ?? 422 }
{ ok: false, error: "Something went wrong, please try again." },
{ status: 500 }
);
}
logger.error("Failed to append to session stream", {
sessionId: session.id,
io: params.io,
error: appendError,
});
return json({ ok: false, error: "Something went wrong, please try again." }, { status: 500 });
}

// Fire any run-scoped waitpoints registered against this channel. Best
Expand All @@ -160,7 +204,7 @@ const { action, loader } = createActionApiRoute(
// `sessions.open(...).in.wait()`, so writers and readers converge
// regardless of which URL form they used.
const [drainError, waitpointIds] = await tryCatch(
drainSessionStreamWaitpoints(addressingKey, params.io)
drainSessionStreamWaitpoints(authentication.environment.id, addressingKey, params.io)
);
if (drainError) {
logger.error("Failed to drain session stream waitpoints", {
Expand Down
Comment thread
ericallam marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
// Drain any waitpoints registered for this channel — same as the
// public append. Best-effort; failure doesn't fail the append.
const [drainError, waitpointIds] = await tryCatch(
drainSessionStreamWaitpoints(addressingKey, io)
drainSessionStreamWaitpoints(environment.id, addressingKey, io)
);
if (drainError) {
logger.error("Failed to drain session stream waitpoints (playground)", {
Expand Down
43 changes: 39 additions & 4 deletions apps/webapp/app/services/realtime/sessions.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ export function canonicalSessionAddressingKey(
*
* Note: `currentRunId` is left as-is — Prisma stores the internal run id
* (cuid), but `SessionItem.currentRunId` is the *friendly* form. Routes
* that emit a single `SessionItem` should use
* {@link serializeSessionWithFriendlyRunId} instead, which resolves the
* friendlyId via a TaskRun lookup. List endpoints stay on this raw form
* to avoid N+1 lookups when paginating.
* that emit `SessionItem`s must translate: single-row endpoints via
* {@link serializeSessionWithFriendlyRunId}, list endpoints via the
* batched {@link serializeSessionsWithFriendlyRunIds}. Never put this
* raw form on the wire directly.
*/
export function serializeSession(session: Session): SessionItem {
return {
Expand Down Expand Up @@ -125,3 +125,38 @@ export async function serializeSessionWithFriendlyRunId(
currentRunId: run?.friendlyId ?? null,
};
}

/**
* Batched form of {@link serializeSessionWithFriendlyRunId} for list
* endpoints: one `IN` lookup per page instead of N+1. `currentRunId` on
* the wire is always the public `run_*` friendlyId — the raw
* {@link serializeSession} form leaks the internal cuid, which customers
* can't use with `runs.retrieve(...)`.
*/
export async function serializeSessionsWithFriendlyRunIds(
sessions: Session[],
scope: { projectId: string; runtimeEnvironmentId: string }
): Promise<SessionItem[]> {
const runIds = [...new Set(sessions.map((s) => s.currentRunId).filter((id): id is string => !!id))];

// `currentRunId` is a plain string pointer (no FK), so scope the lookup to
// the caller's tenant — a stale value must not resolve a run in another env.
const runs = runIds.length
? await $replica.taskRun.findMany({
where: {
id: { in: runIds },
projectId: scope.projectId,
runtimeEnvironmentId: scope.runtimeEnvironmentId,
},
select: { id: true, friendlyId: true },
})
Comment thread
ericallam marked this conversation as resolved.
: [];
const friendlyIdByRunId = new Map(runs.map((run) => [run.id, run.friendlyId]));

return sessions.map((session) => ({
...serializeSession(session),
currentRunId: session.currentRunId
? friendlyIdByRunId.get(session.currentRunId) ?? null
: null,
}));
}
Loading
Loading