Skip to content

Commit a04cdff

Browse files
authored
fix(webapp): stop replica lag from double-triggering session runs and 404ing fresh sessions (#3914)
## Summary Two read-replica races on the session APIs could break chats whose first activity lands inside the replication window (or any time the replica lags): 1. A session's first `.in` append or `.out` subscribe could fail with a 404 for a session that exists on the writer, because the route resolved the Session row on the replica only. 2. `ensureRunForSession` probed run liveness on the replica, so a probe miss on a run triggered moments earlier was judged "run is dead" and a second live run was spawned for the same session. Both runs then consumed the same input stream, producing duplicated turns and doubled responses (and doubled LLM cost). ## Fix Liveness now re-probes the writer before declaring the current run dead (the old code already fell back to the writer, but only to recover the friendlyId, after the wrong verdict was made). Session resolution on the append and subscribe/init routes goes through a new `resolveSessionWithWriterFallback`, which stays replica-first on the hot path and only touches the writer on a miss. Reproduced and verified against a local streaming replica with an artificial apply delay: pre-fix, a send immediately after session creation reliably produced either the 404 or two executing runs with a doubled response; post-fix, the same flow produces exactly one run and one response. Also rides along: the local docker replica's default apply delay drops from 150ms to a realistic 20ms (override via `REPLICA_APPLY_DELAY` when you want to deliberately widen the race window).
1 parent eb498d1 commit a04cdff

6 files changed

Lines changed: 53 additions & 32 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix read-replica races on the session APIs: a fresh session's first append or subscribe no longer fails with a 404, and a just-triggered session run is no longer mistaken for dead, which could double-trigger the run and duplicate chat responses.

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ import { json } from "@remix-run/server-runtime";
22
import { tryCatch } from "@trigger.dev/core/utils";
33
import { nanoid } from "nanoid";
44
import { z } from "zod";
5-
import { $replica } from "~/db.server";
65
import { logger } from "~/services/logger.server";
76
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
87
import { ensureRunForSession } from "~/services/realtime/sessionRunManager.server";
98
import {
109
canonicalSessionAddressingKey,
11-
resolveSessionByIdOrExternalId,
10+
resolveSessionWithWriterFallback,
1211
} from "~/services/realtime/sessions.server";
1312
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
1413
import {
@@ -55,9 +54,10 @@ const { action, loader } = createActionApiRoute(
5554
// also triggers the first run). The row exists before any caller
5655
// can reach `.in/append` — no row, no append. Resolved here so the
5756
// authorization scope can expand to both addressing forms (friendlyId
58-
// + externalId) and the handler can skip its own lookup.
57+
// + externalId) and the handler can skip its own lookup. Writer
58+
// fallback: a first append can land inside the replica apply window.
5959
findResource: async (params, auth) =>
60-
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.session),
60+
resolveSessionWithWriterFallback(auth.environment.id, params.session),
6161
authorization: {
6262
action: "write",
6363
// Authorize against the union of the URL form, friendlyId, and

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
3-
import { $replica } from "~/db.server";
43
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
54
import { S2RealtimeStreams } from "~/services/realtime/s2realtimeStreams.server";
65
import {
76
canonicalSessionAddressingKey,
87
isSessionFriendlyIdForm,
9-
resolveSessionByIdOrExternalId,
8+
resolveSessionWithWriterFallback,
109
} from "~/services/realtime/sessions.server";
1110
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
1211
import {
@@ -43,8 +42,7 @@ const { action } = createActionApiRoute(
4342
// when a row exists. The S2 stream key is built from the row's
4443
// canonical key (externalId if set, else friendlyId) so writers
4544
// and readers converge regardless of URL form.
46-
const maybeSession = await resolveSessionByIdOrExternalId(
47-
$replica,
45+
const maybeSession = await resolveSessionWithWriterFallback(
4846
authentication.environment.id,
4947
params.session
5048
);
@@ -100,8 +98,7 @@ const loader = createLoaderApiRoute(
10098
allowJWT: true,
10199
corsStrategy: "all",
102100
findResource: async (params, auth) => {
103-
const row = await resolveSessionByIdOrExternalId(
104-
$replica,
101+
const row = await resolveSessionWithWriterFallback(
105102
auth.environment.id,
106103
params.session
107104
);

apps/webapp/app/services/realtime/sessionRunManager.server.ts

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,31 +111,32 @@ export async function ensureRunForSession(
111111
// 1. Probe currentRunId.
112112
let priorDeadRunFriendlyId: string | undefined;
113113
if (session.currentRunId) {
114-
const probe = await getRunStatusAndFriendlyId(session.currentRunId);
114+
let probe = await getRunStatusAndFriendlyId(session.currentRunId);
115+
if (!probe) {
116+
// Replica miss on a row we just observed via `currentRunId` — the
117+
// run was likely triggered moments ago and hasn't replicated yet.
118+
// Re-probe the writer BEFORE deciding liveness: treating a lagging
119+
// replica as "row vanished" double-triggers the session (a fast
120+
// first append after session create races the replica apply delay
121+
// and spawns a second live run consuming the same `.in`).
122+
probe = await prisma.taskRun.findFirst({
123+
where: { id: session.currentRunId },
124+
select: { status: true, friendlyId: true },
125+
});
126+
}
115127
if (probe && !isFinalRunStatus(probe.status)) {
116128
return { runId: session.currentRunId, triggered: false };
117129
}
118-
// Either the row vanished (probe null) or its status is final. Either
119-
// way the prior run isn't going to consume new appends — but the
120-
// session may still hold conversation state on `session.out` and an
121-
// S3 snapshot keyed on `session.friendlyId`. Forward the prior run's
122-
// public-form id (friendlyId — same shape as `ctx.run.id`) to the
123-
// agent as `previousRunId` so its boot gate flips
130+
// Either the row vanished on the writer too (probe null) or its status
131+
// is final. Either way the prior run isn't going to consume new
132+
// appends — but the session may still hold conversation state on
133+
// `session.out` and an S3 snapshot keyed on `session.friendlyId`.
134+
// Forward the prior run's public-form id (friendlyId — same shape as
135+
// `ctx.run.id`) to the agent as `previousRunId` so its boot gate flips
124136
// `couldHavePriorState` and replays the persisted state instead of
125137
// treating this as a fresh chat. See `chat.agent`'s boot orchestration
126138
// in `packages/trigger-sdk/src/v3/ai.ts`.
127-
if (probe?.friendlyId) {
128-
priorDeadRunFriendlyId = probe.friendlyId;
129-
} else {
130-
// Replica miss on a row we just observed via `currentRunId`. Retry
131-
// on the writer so the customer's `runs.retrieve(previousRunId)`
132-
// gets the public `run_*` form rather than the internal cuid.
133-
const writerProbe = await prisma.taskRun.findFirst({
134-
where: { id: session.currentRunId },
135-
select: { friendlyId: true },
136-
});
137-
priorDeadRunFriendlyId = writerProbe?.friendlyId ?? session.currentRunId;
138-
}
139+
priorDeadRunFriendlyId = probe?.friendlyId ?? session.currentRunId;
139140
}
140141

141142
// 2. Validate config + trigger upfront. Continuation overrides

apps/webapp/app/services/realtime/sessions.server.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { PrismaClient, Session } from "@trigger.dev/database";
22
import type { SessionItem } from "@trigger.dev/core/v3";
3-
import { $replica } from "~/db.server";
3+
import { $replica, prisma } from "~/db.server";
44

55
/**
66
* Prefix that {@link SessionId.generate} attaches to every Session friendlyId.
@@ -36,6 +36,22 @@ export async function resolveSessionByIdOrExternalId(
3636
});
3737
}
3838

39+
/**
40+
* Replica-first session resolution with a writer fallback on miss. For the
41+
* hot realtime routes (append / SSE subscribe / end-and-continue): a fresh
42+
* session's first append or subscribe can arrive inside the replica's apply
43+
* window, and a bare replica miss there surfaces as a 404 (or a subscribe
44+
* that never finds its session) for a session that exists on the writer.
45+
*/
46+
export async function resolveSessionWithWriterFallback(
47+
runtimeEnvironmentId: string,
48+
idOrExternalId: string
49+
): Promise<Session | null> {
50+
const row = await resolveSessionByIdOrExternalId($replica, runtimeEnvironmentId, idOrExternalId);
51+
if (row) return row;
52+
return resolveSessionByIdOrExternalId(prisma, runtimeEnvironmentId, idOrExternalId);
53+
}
54+
3955
/** True for `session_*` friendlyId form, false for everything else. */
4056
export function isSessionFriendlyIdForm(value: string): boolean {
4157
return value.startsWith(SESSION_FRIENDLY_ID_PREFIX);

docker/docker-compose.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ services:
5858
# docker exec database bash -c 'grep -q "host replication" "$PGDATA/pg_hba.conf" || echo "host replication all all md5" >> "$PGDATA/pg_hba.conf"'
5959
# docker exec database psql -U postgres -c "SELECT pg_reload_conf()"
6060
# Then point the webapp at it: DATABASE_READ_REPLICA_URL=postgresql://postgres:postgres@localhost:5433/postgres
61-
# Tune the lag via REPLICA_APPLY_DELAY (e.g. 150ms, 2s). Wipe database-replica-data to re-init.
61+
# Tune the lag via REPLICA_APPLY_DELAY (default 20ms ~ realistic prod lag; crank to 150ms/2s to
62+
# shake out replica races). Wipe database-replica-data to re-init.
6263
database-replica:
6364
container_name: ${CONTAINER_PREFIX:-}database-replica
6465
profiles: ["replica"]
@@ -72,7 +73,7 @@ services:
7273
- ${DB_REPLICA_VOLUME:-database-replica-data}:/var/lib/postgresql/data/
7374
environment:
7475
PGPASSWORD: postgres
75-
REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-150ms}
76+
REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-20ms}
7677
networks:
7778
- app_network
7879
ports:

0 commit comments

Comments
 (0)