From 79dd2da03c6606b3c0ff69a6b7aba67922e44661 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 12:42:13 +0100 Subject: [PATCH 01/21] feat(testcontainers): schemaOnlyPrisma fixture - empty template clone for replica-lag tests Co-Authored-By: Claude Fable 5 --- internal-packages/testcontainers/src/index.ts | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index 8b687402f6d..1950c999bab 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -216,6 +216,42 @@ const clonedPostgresContainer = async ({}, use: Use) } }; +// A second migrated-but-empty database on the same worker postgres, cloned from the schema +// template. For tests that need to simulate a read replica that hasn't caught up: schema +// present, rows absent. Lazy - only booted when a test destructures it. +const schemaOnlyPrismaFixture = async ({}: {}, use: Use) => { + const container = await getWorkerPostgresContainer(); + const baseUri = container.getConnectionUri(); + const cloneDb = `schema_only_${pgCloneCounter++}`; + + const admin = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, + }); + await admin.$executeRawUnsafe(`CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"`); + await admin.$disconnect(); + + const prisma = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, cloneDb) } }, + }); + try { + await use(prisma); + } finally { + await logCleanup("schemaOnlyPrisma", prisma.$disconnect()); + // Best-effort drop, mirroring clonedPostgresContainer cleanup - the container is reaped on + // worker exit anyway, so never let cleanup fail the test. + const cleanup = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, + }); + try { + await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`); + } catch { + // ignore - reaped with the container anyway + } finally { + await cleanup.$disconnect(); + } + } +}; + const prismaFromContainer = async ( { postgresContainer }: { postgresContainer: StartedPostgreSqlContainer }, use: Use @@ -454,6 +490,7 @@ export const postgresAndRedisTest = test.extend({ type ContainerTestContext = { postgresContainer: StartedPostgreSqlContainer; prisma: PrismaClient; + schemaOnlyPrisma: PrismaClient; redisContainer: StartedRedisContainer; resetRedis: void; redisOptions: RedisOptions; @@ -468,6 +505,7 @@ type ContainerTestContext = { export const containerTest = test.extend({ postgresContainer: clonedPostgresContainer, prisma: prismaFromContainer, + schemaOnlyPrisma: schemaOnlyPrismaFixture, redisContainer: [bootWorkerRedis, { scope: "worker" }], resetRedis: [flushRedis, { auto: true }], redisOptions, From d44f4f3d3dedcb5d119e76c41baeb74f782e8cb3 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 12:48:04 +0100 Subject: [PATCH 02/21] test(run-engine): cover getSnapshotsSince replica-miss fallback (red) Co-Authored-By: Claude Fable 5 --- .../engine/tests/getSnapshotsSince.test.ts | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index 77867b1b1b1..68008bb9589 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -14,6 +14,7 @@ import { generateLargeOutput, } from "./helpers/snapshotTestHelpers.js"; import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; +import type { PrismaReplicaClient } from "@trigger.dev/database"; vi.setConfig({ testTimeout: 120_000 }); @@ -679,4 +680,171 @@ describe("RunEngine getSnapshotsSince", () => { } } ); + + containerTest( + "falls back to the primary when the replica is missing the since snapshot", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + // An empty (schema-only) database stands in for a read replica that has not + // caught up: every lookup on it misses, so the engine must fall back to the + // primary instead of failing the poll. + readOnlyPrisma: schemaOnlyPrisma as PrismaReplicaClient, + readReplicaSnapshotsSinceEnabled: true, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_fallback", + spanId: "s_replica_fallback", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_fallback", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + // The since-snapshot exists only on the primary - the replica misses it. + const firstSnapshot = allSnapshots[0]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: firstSnapshot.id, + }); + + // Served by the primary fallback, not a failed poll. + expect(result).not.toBeNull(); + expect(result!.length).toBe(allSnapshots.length - 1); + expect(result!.map((s) => s.snapshot.id)).toEqual(allSnapshots.slice(1).map((s) => s.id)); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "returns null when the snapshot is missing on both replica and primary", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + readOnlyPrisma: schemaOnlyPrisma as PrismaReplicaClient, + readReplicaSnapshotsSinceEnabled: true, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_both_miss", + spanId: "s_replica_both_miss", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + // A snapshot id that exists nowhere - a genuine miss stays an error (null). + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: "snapshot_does_not_exist", + }); + + expect(result).toBeNull(); + } finally { + await engine.quit(); + } + } + ); }); From 516533ce1d2a39b79e966823dd9000982346a79d Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 12:56:37 +0100 Subject: [PATCH 03/21] test(run-engine): tie-proof replica fallback window assertion, drop redundant cast Co-Authored-By: Claude Fable 5 --- .../src/engine/tests/getSnapshotsSince.test.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index 68008bb9589..e09ea1211a0 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -14,7 +14,7 @@ import { generateLargeOutput, } from "./helpers/snapshotTestHelpers.js"; import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; -import type { PrismaReplicaClient } from "@trigger.dev/database"; + vi.setConfig({ testTimeout: 120_000 }); @@ -691,7 +691,7 @@ describe("RunEngine getSnapshotsSince", () => { // An empty (schema-only) database stands in for a read replica that has not // caught up: every lookup on it misses, so the engine must fall back to the // primary instead of failing the poll. - readOnlyPrisma: schemaOnlyPrisma as PrismaReplicaClient, + readOnlyPrisma: schemaOnlyPrisma, readReplicaSnapshotsSinceEnabled: true, worker: { redis: redisOptions, @@ -766,8 +766,12 @@ describe("RunEngine getSnapshotsSince", () => { // Served by the primary fallback, not a failed poll. expect(result).not.toBeNull(); - expect(result!.length).toBe(allSnapshots.length - 1); - expect(result!.map((s) => s.snapshot.id)).toEqual(allSnapshots.slice(1).map((s) => s.id)); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > firstSnapshot.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + expect(result!.length).toBe(expectedSnapshots.length); + expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); } finally { await engine.quit(); } @@ -781,7 +785,7 @@ describe("RunEngine getSnapshotsSince", () => { const engine = new RunEngine({ prisma, - readOnlyPrisma: schemaOnlyPrisma as PrismaReplicaClient, + readOnlyPrisma: schemaOnlyPrisma, readReplicaSnapshotsSinceEnabled: true, worker: { redis: redisOptions, From ade8c553f2764b644bd3400188f04b8c550cc01f Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 12:57:56 +0100 Subject: [PATCH 04/21] feat(run-engine): typed ExecutionSnapshotNotFoundError from getExecutionSnapshotsSince Co-Authored-By: Claude Fable 5 --- internal-packages/run-engine/src/engine/errors.ts | 7 +++++++ .../src/engine/systems/executionSnapshotSystem.ts | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index 9a41cba11ee..2bb05a304c9 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -104,3 +104,10 @@ export class RunOneTimeUseTokenError extends Error { this.name = "RunOneTimeUseTokenError"; } } + +export class ExecutionSnapshotNotFoundError extends Error { + constructor(public readonly snapshotId: string) { + super(`No execution snapshot found for id ${snapshotId}`); + this.name = "ExecutionSnapshotNotFoundError"; + } +} diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index d615c066b85..c8674a60530 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -10,6 +10,7 @@ import { TaskRunStatus, Waitpoint, } from "@trigger.dev/database"; +import { ExecutionSnapshotNotFoundError } from "../errors.js"; import { HeartbeatTimeouts } from "../types.js"; import { SystemResources } from "./systems.js"; @@ -278,7 +279,7 @@ export async function getExecutionSnapshotsSince( }); if (!sinceSnapshot) { - throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`); + throw new ExecutionSnapshotNotFoundError(sinceSnapshotId); } // Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion From 73d5c55bf21b78350a1ef727d519928ca0a822cd Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 13:01:29 +0100 Subject: [PATCH 05/21] feat(run-engine): retry getSnapshotsSince on the primary when the replica misses the since snapshot Co-Authored-By: Claude Fable 5 --- .../run-engine/src/engine/index.ts | 52 ++++++++++++++++--- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 835ff90cc48..8126005a339 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1,5 +1,5 @@ import { createRedisClient, Redis } from "@internal/redis"; -import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing"; +import { type Counter, getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; import { CheckpointInput, @@ -46,7 +46,12 @@ import { RunQueue } from "../run-queue/index.js"; import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js"; import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js"; import { BillingCache } from "./billingCache.js"; -import { NotImplementedError, RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./errors.js"; +import { + ExecutionSnapshotNotFoundError, + NotImplementedError, + RunDuplicateIdempotencyKeyError, + RunOneTimeUseTokenError, +} from "./errors.js"; import { EventBus, EventBusEvents } from "./eventBus.js"; import { RunLocker } from "./locking.js"; import { getFinalRunStatuses } from "./statuses.js"; @@ -88,6 +93,7 @@ export class RunEngine { private logger: Logger; private tracer: Tracer; private meter: Meter; + private snapshotsSinceReplicaMissCounter: Counter; private heartbeatTimeouts: HeartbeatTimeouts; private repairSnapshotTimeoutMs: number; private batchQueue: BatchQueue; @@ -272,6 +278,14 @@ export class RunEngine { this.tracer = options.tracer; this.meter = options.meter ?? getMeter("run-engine"); + this.snapshotsSinceReplicaMissCounter = this.meter.createCounter( + "run_engine.snapshots_since.replica_miss", + { + description: + "getSnapshotsSince reads where the since snapshot was not yet on the read replica and the query was retried on the primary", + } + ); + const defaultHeartbeatTimeouts: HeartbeatTimeouts = { PENDING_EXECUTING: 60_000, PENDING_CANCEL: 60_000, @@ -1918,13 +1932,39 @@ export class RunEngine { snapshotId: string; tx?: PrismaClientOrTransaction; }): Promise { - const prisma = - tx ?? (this.options.readReplicaSnapshotsSinceEnabled ? this.readOnlyPrisma : this.prisma); + const useReplica = !tx && this.options.readReplicaSnapshotsSinceEnabled === true; + const prisma = tx ?? (useReplica ? this.readOnlyPrisma : this.prisma); - try { - const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId); + const query = async (client: PrismaClientOrTransaction) => { + const snapshots = await getExecutionSnapshotsSince(client, runId, snapshotId); return snapshots.map(executionDataFromSnapshot); + }; + + try { + return await query(prisma); } catch (e) { + if (useReplica && e instanceof ExecutionSnapshotNotFoundError) { + // Expected during replica lag: the runner learned the snapshot id from the writer + // before the replica caught up. Serve the read from the writer instead of failing + // the poll. + this.snapshotsSinceReplicaMissCounter.add(1); + this.logger.warn("getSnapshotsSince: snapshot not yet on replica, retrying on primary", { + runId, + snapshotId, + }); + + try { + return await query(this.prisma); + } catch (retryError) { + this.logger.error("Failed to getSnapshotsSince", { + message: retryError instanceof Error ? retryError.message : retryError, + runId, + snapshotId, + }); + return null; + } + } + this.logger.error("Failed to getSnapshotsSince", { message: e instanceof Error ? e.message : e, runId, From d680da8d5356f641a0aa509d6f456b776681bd98 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 13:07:35 +0100 Subject: [PATCH 06/21] fix(run-engine): only count replica misses the primary can serve; skip fallback without a replica Co-Authored-By: Claude Fable 5 --- .../run-engine/src/engine/index.ts | 24 +++++++++++-------- .../engine/tests/getSnapshotsSince.test.ts | 1 - 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 8126005a339..7ad1dfe4e68 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -282,7 +282,7 @@ export class RunEngine { "run_engine.snapshots_since.replica_miss", { description: - "getSnapshotsSince reads where the since snapshot was not yet on the read replica and the query was retried on the primary", + "getSnapshotsSince reads where the since snapshot was not yet on the read replica and was served from the primary", } ); @@ -1932,7 +1932,10 @@ export class RunEngine { snapshotId: string; tx?: PrismaClientOrTransaction; }): Promise { - const useReplica = !tx && this.options.readReplicaSnapshotsSinceEnabled === true; + const useReplica = + !tx && + this.options.readReplicaSnapshotsSinceEnabled === true && + this.readOnlyPrisma !== this.prisma; const prisma = tx ?? (useReplica ? this.readOnlyPrisma : this.prisma); const query = async (client: PrismaClientOrTransaction) => { @@ -1946,15 +1949,16 @@ export class RunEngine { if (useReplica && e instanceof ExecutionSnapshotNotFoundError) { // Expected during replica lag: the runner learned the snapshot id from the writer // before the replica caught up. Serve the read from the writer instead of failing - // the poll. - this.snapshotsSinceReplicaMissCounter.add(1); - this.logger.warn("getSnapshotsSince: snapshot not yet on replica, retrying on primary", { - runId, - snapshotId, - }); - + // the poll. Only count/warn when the writer actually has the snapshot - a permanent + // miss (bogus or pruned snapshot id) is a real error, not replica lag. try { - return await query(this.prisma); + const result = await query(this.prisma); + this.snapshotsSinceReplicaMissCounter.add(1); + this.logger.warn("getSnapshotsSince: snapshot not yet on replica, served from primary", { + runId, + snapshotId, + }); + return result; } catch (retryError) { this.logger.error("Failed to getSnapshotsSince", { message: retryError instanceof Error ? retryError.message : retryError, diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index e09ea1211a0..2e6202ec8ce 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -15,7 +15,6 @@ import { } from "./helpers/snapshotTestHelpers.js"; import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; - vi.setConfig({ testTimeout: 120_000 }); describe("RunEngine getSnapshotsSince", () => { From 66515686d7d6206f25711330082d4d3b3795fb45 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 13:10:47 +0100 Subject: [PATCH 07/21] chore: server-changes note for snapshots-since replica fallback Co-Authored-By: Claude Fable 5 --- .../snapshots-since-replica-primary-fallback.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .server-changes/snapshots-since-replica-primary-fallback.md diff --git a/.server-changes/snapshots-since-replica-primary-fallback.md b/.server-changes/snapshots-since-replica-primary-fallback.md new file mode 100644 index 00000000000..c3d6d892056 --- /dev/null +++ b/.server-changes/snapshots-since-replica-primary-fallback.md @@ -0,0 +1,8 @@ +--- +area: webapp +type: improvement +--- + +Run snapshot polling no longer errors or pays extra latency when the database read replica +briefly lags behind the primary (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read +is served from the primary instead. From a868b3a5b2d8d558ad4fd48a76ad7a4d55d88a48 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 14:06:45 +0100 Subject: [PATCH 08/21] test(run-engine): cover replica-read, stale-tail, flag-off, tx bypass and counter semantics for getSnapshotsSince Co-Authored-By: Claude Fable 5 --- internal-packages/run-engine/package.json | 1 + .../engine/tests/getSnapshotsSince.test.ts | 322 ++++++++++++++++++ .../tests/helpers/replicaTestHelpers.ts | 95 ++++++ pnpm-lock.yaml | 3 + 4 files changed, 421 insertions(+) create mode 100644 internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index 680d385ca4e..8b876a1aab6 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -35,6 +35,7 @@ }, "devDependencies": { "@internal/testcontainers": "workspace:*", + "@opentelemetry/sdk-metrics": "2.7.1", "@types/seedrandom": "^3.0.8", "rimraf": "6.0.1" }, diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index 2e6202ec8ce..bd19b5e5f49 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -13,6 +13,10 @@ import { setupTestScenario, generateLargeOutput, } from "./helpers/snapshotTestHelpers.js"; +import { + copySnapshotsToReplica, + createTestMetricsMeter, +} from "./helpers/replicaTestHelpers.js"; import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic"; vi.setConfig({ testTimeout: 120_000 }); @@ -684,6 +688,7 @@ describe("RunEngine getSnapshotsSince", () => { "falls back to the primary when the replica is missing the since snapshot", async ({ prisma, schemaOnlyPrisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); const engine = new RunEngine({ prisma, @@ -717,6 +722,7 @@ describe("RunEngine getSnapshotsSince", () => { baseCostInCents: 0.0001, }, tracer: trace.getTracer("test", "0.0.0"), + meter, }); try { @@ -771,6 +777,9 @@ describe("RunEngine getSnapshotsSince", () => { expect(expectedSnapshots.length).toBeGreaterThan(0); expect(result!.length).toBe(expectedSnapshots.length); expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + + // The fallback fired exactly once - this counter is the prod rollout signal. + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(1); } finally { await engine.quit(); } @@ -781,6 +790,7 @@ describe("RunEngine getSnapshotsSince", () => { "returns null when the snapshot is missing on both replica and primary", async ({ prisma, schemaOnlyPrisma, redisOptions }) => { const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); const engine = new RunEngine({ prisma, @@ -811,6 +821,7 @@ describe("RunEngine getSnapshotsSince", () => { baseCostInCents: 0.0001, }, tracer: trace.getTracer("test", "0.0.0"), + meter, }); try { @@ -845,6 +856,317 @@ describe("RunEngine getSnapshotsSince", () => { }); expect(result).toBeNull(); + + // Permanent misses are deliberately NOT counted - the counter only tracks + // reads actually served by the primary fallback. + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "serves the replica's view when the replica has the since snapshot but lags behind the primary", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // The schema-only database stands in for a replica that has the since-snapshot + // but lags behind the primary by one snapshot (the newest one is excluded below). + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_stale_tail", + spanId: "s_replica_stale_tail", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_stale_tail", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThanOrEqual(3); + + const since = allSnapshots[0]; + const tail = allSnapshots[allSnapshots.length - 1]; + + // Replica has everything EXCEPT the newest snapshot - a lagging-but-usable replica. + await copySnapshotsToReplica(prisma, schemaOnlyPrisma, run.id, { + excludeSnapshotIds: [tail.id], + }); + + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: since.id, + }); + + expect(result).not.toBeNull(); + + // The replica's view: everything after the since snapshot, minus the tail + // it hasn't received yet. If reads were hitting the primary, the tail would + // be present and these assertions would fail. + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > since.createdAt.getTime() && s.id !== tail.id + ); + expect(result!.map((s) => s.snapshot.id)).not.toContain(tail.id); + expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + + // The since snapshot was on the replica - no fallback fired. + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "reads from the primary when the flag is off even with a replica configured", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // Replica configured but EMPTY, flag off: a correct result can only come + // from the primary. + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: false, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_flag_off", + spanId: "s_replica_flag_off", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_flag_off", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + const since = allSnapshots[0]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: since.id, + }); + + expect(result).not.toBeNull(); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > since.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + + // No replica read attempted, so no fallback could have fired. + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "uses the provided transaction client and never falls back", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // Flag ON with an EMPTY replica: if the provided tx didn't bypass the replica, + // the read would miss and (at best) be served by the fallback, incrementing + // the counter. + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_tx_bypass", + spanId: "s_replica_tx_bypass", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_tx_bypass", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + const since = allSnapshots[0]; + const result = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: since.id, + tx: prisma, + }); + + expect(result).not.toBeNull(); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > since.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + + // The tx served the read directly - the fallback path never ran. + expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); } finally { await engine.quit(); } diff --git a/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts b/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts new file mode 100644 index 00000000000..ba9aeb7eb43 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts @@ -0,0 +1,95 @@ +import { + AggregationTemporality, + InMemoryMetricExporter, + MeterProvider, + PeriodicExportingMetricReader, +} from "@opentelemetry/sdk-metrics"; +import { Prisma, PrismaClient } from "@trigger.dev/database"; + +/** + * Copies a run's TaskRunExecutionSnapshot rows from the primary database into the + * replica database. The replica (a schema-only clone) has no parent rows (no TaskRun), + * so the rows are inserted with FK triggers disabled - exactly how a physical replica's + * data arrives, without FK re-checks. + */ +export async function copySnapshotsToReplica( + primary: PrismaClient, + replica: PrismaClient, + runId: string, + opts?: { excludeSnapshotIds?: string[] } +) { + const rows = await primary.taskRunExecutionSnapshot.findMany({ where: { runId } }); + const toCopy = rows.filter((r) => !(opts?.excludeSnapshotIds ?? []).includes(r.id)); + + await replica.$transaction(async (tx) => { + // SET LOCAL applies for the duration of this transaction (same connection), + // disabling FK triggers like a physical replica's apply process. + await tx.$executeRawUnsafe(`SET LOCAL session_replication_role = replica`); + + for (const row of toCopy) { + await tx.taskRunExecutionSnapshot.create({ + data: { + id: row.id, + engine: row.engine, + executionStatus: row.executionStatus, + description: row.description, + isValid: row.isValid, + error: row.error, + previousSnapshotId: row.previousSnapshotId, + runId: row.runId, + runStatus: row.runStatus, + batchId: row.batchId, + attemptNumber: row.attemptNumber, + environmentId: row.environmentId, + environmentType: row.environmentType, + projectId: row.projectId, + organizationId: row.organizationId, + completedWaitpointOrder: row.completedWaitpointOrder, + checkpointId: row.checkpointId, + workerId: row.workerId, + runnerId: row.runnerId, + // Preserve timestamps exactly - the snapshots-since window query depends on them. + createdAt: row.createdAt, + updatedAt: row.updatedAt, + lastHeartbeatAt: row.lastHeartbeatAt, + metadata: row.metadata === null ? Prisma.DbNull : row.metadata, + }, + }); + } + }); +} + +/** + * Creates a real OTel meter backed by an in-memory exporter, plus a helper to read + * a counter's current cumulative value. No mocks - this exercises the same metrics + * pipeline production uses. + */ +export function createTestMetricsMeter() { + const exporter = new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE); + // Long interval: exports only happen via explicit forceFlush() below. + const reader = new PeriodicExportingMetricReader({ exporter, exportIntervalMillis: 3_600_000 }); + const meterProvider = new MeterProvider({ readers: [reader] }); + const meter = meterProvider.getMeter("test"); + + const getCounterValue = async (name: string): Promise => { + await reader.forceFlush(); + const resourceMetrics = exporter.getMetrics(); + + // Cumulative temporality: every export batch carries the full running total, + // so read the most recent batch that contains the metric. A counter that was + // never added to exports no data points - treat that as 0. + for (let i = resourceMetrics.length - 1; i >= 0; i--) { + for (const scopeMetrics of resourceMetrics[i].scopeMetrics) { + for (const metric of scopeMetrics.metrics) { + if (metric.descriptor.name === name && metric.dataPoints.length > 0) { + return metric.dataPoints.reduce((sum, dp) => sum + (dp.value as number), 0); + } + } + } + } + + return 0; + }; + + return { meter, getCounterValue }; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 782b62cf7ff..807ae796582 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1329,6 +1329,9 @@ importers: '@internal/testcontainers': specifier: workspace:* version: link:../testcontainers + '@opentelemetry/sdk-metrics': + specifier: 2.7.1 + version: 2.7.1(@opentelemetry/api@1.9.1) '@types/seedrandom': specifier: ^3.0.8 version: 3.0.8 From 973706056060fea4e96f4dcd24ecb0961c2a8b69 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 14:13:24 +0100 Subject: [PATCH 09/21] fix(run-engine): mark replica-retry failures in logs, narrow release note, trim dead test setup Co-Authored-By: Claude Fable 5 --- ...napshots-since-replica-primary-fallback.md | 4 +-- .../run-engine/src/engine/index.ts | 1 + .../engine/tests/getSnapshotsSince.test.ts | 29 ++----------------- 3 files changed, 5 insertions(+), 29 deletions(-) diff --git a/.server-changes/snapshots-since-replica-primary-fallback.md b/.server-changes/snapshots-since-replica-primary-fallback.md index c3d6d892056..5bae0254b5e 100644 --- a/.server-changes/snapshots-since-replica-primary-fallback.md +++ b/.server-changes/snapshots-since-replica-primary-fallback.md @@ -4,5 +4,5 @@ type: improvement --- Run snapshot polling no longer errors or pays extra latency when the database read replica -briefly lags behind the primary (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read -is served from the primary instead. +hasn't yet replicated the snapshot the runner is polling from +(`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is served from the primary instead. diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 7ad1dfe4e68..5e941d263f1 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1964,6 +1964,7 @@ export class RunEngine { message: retryError instanceof Error ? retryError.message : retryError, runId, snapshotId, + retriedFromReplica: true, }); return null; } diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index bd19b5e5f49..44a33c5d68e 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -789,7 +789,6 @@ describe("RunEngine getSnapshotsSince", () => { containerTest( "returns null when the snapshot is missing on both replica and primary", async ({ prisma, schemaOnlyPrisma, redisOptions }) => { - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); const { meter, getCounterValue } = createTestMetricsMeter(); const engine = new RunEngine({ @@ -825,33 +824,9 @@ describe("RunEngine getSnapshotsSince", () => { }); try { - const taskIdentifier = "test-task"; - await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); - - const runFriendlyId = generateFriendlyId("run"); - const run = await engine.trigger( - { - number: 1, - friendlyId: runFriendlyId, - environment: authenticatedEnvironment, - taskIdentifier, - payload: "{}", - payloadType: "application/json", - context: {}, - traceContext: {}, - traceId: "t_replica_both_miss", - spanId: "s_replica_both_miss", - workerQueue: "main", - queue: "task/test-task", - isTest: false, - tags: [], - }, - prisma - ); - - // A snapshot id that exists nowhere - a genuine miss stays an error (null). + // runId is never consulted - the since-snapshot lookup throws on the bogus id first. const result = await engine.getSnapshotsSince({ - runId: run.id, + runId: "run_does_not_exist", snapshotId: "snapshot_does_not_exist", }); From a5d59a14a64651819b2024e7d5e99984921dc447 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 14:24:12 +0100 Subject: [PATCH 10/21] test(run-engine): getSnapshotsSince must reject a since snapshot from a different run (red) Co-Authored-By: Claude Fable 5 --- .../engine/tests/getSnapshotsSince.test.ts | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index 44a33c5d68e..aa250e91ade 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -436,6 +436,117 @@ describe("RunEngine getSnapshotsSince", () => { } }); + containerTest( + "returns null when the since snapshot belongs to a different run", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runA = await engine.trigger( + { + number: 1, + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_foreign_snapshot", + spanId: "s_foreign_snapshot_a", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + const runB = await engine.trigger( + { + number: 2, + friendlyId: generateFriendlyId("run"), + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_foreign_snapshot", + spanId: "s_foreign_snapshot_b", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_foreign_snapshot", + workerQueue: "main", + }); + + const runASnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: runA.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + const runBSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: runB.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + + expect(runASnapshots.length).toBeGreaterThanOrEqual(1); + expect(runBSnapshots.length).toBeGreaterThanOrEqual(1); + + const runASnapshot = runASnapshots[0]; + + // Poll run B using a snapshot id that belongs to run A + const result = await engine.getSnapshotsSince({ + runId: runB.id, + snapshotId: runASnapshot.id, + }); + + expect(result).toBeNull(); + } finally { + await engine.quit(); + } + } + ); + // Direct database tests for the core function containerTest( "direct test: large waitpoint scenario - 100 waitpoints with 10KB outputs", From 1c128bd0eda74bad83577c0327fd39abb76191ac Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 14:25:34 +0100 Subject: [PATCH 11/21] fix(run-engine): scope getSnapshotsSince anchor lookup to the polled run Co-Authored-By: Claude Fable 5 --- .server-changes/snapshots-since-scoped-to-run.md | 7 +++++++ .../src/engine/systems/executionSnapshotSystem.ts | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 .server-changes/snapshots-since-scoped-to-run.md diff --git a/.server-changes/snapshots-since-scoped-to-run.md b/.server-changes/snapshots-since-scoped-to-run.md new file mode 100644 index 00000000000..89f6c406119 --- /dev/null +++ b/.server-changes/snapshots-since-scoped-to-run.md @@ -0,0 +1,7 @@ +--- +area: webapp +type: fix +--- + +Snapshot polling now rejects a since-snapshot id that doesn't belong to the run being polled, +instead of using its timestamp to return a too-wide window of the run's snapshots. diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index c8674a60530..d8e9656f395 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -274,7 +274,7 @@ export async function getExecutionSnapshotsSince( ): Promise { // Step 1: Find the createdAt of the sinceSnapshotId const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({ - where: { id: sinceSnapshotId }, + where: { id: sinceSnapshotId, runId }, select: { createdAt: true }, }); From 4f988285a0c8be2f17a3d0a2c4f34bbf5b1fb8dc Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 14:58:04 +0100 Subject: [PATCH 12/21] chore(run-engine): trim assertion-echo comments, tighten fallback comment Co-Authored-By: Claude Fable 5 --- internal-packages/run-engine/src/engine/index.ts | 7 +++---- .../run-engine/src/engine/tests/getSnapshotsSince.test.ts | 4 ---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 5e941d263f1..1f0371733f7 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1947,10 +1947,9 @@ export class RunEngine { return await query(prisma); } catch (e) { if (useReplica && e instanceof ExecutionSnapshotNotFoundError) { - // Expected during replica lag: the runner learned the snapshot id from the writer - // before the replica caught up. Serve the read from the writer instead of failing - // the poll. Only count/warn when the writer actually has the snapshot - a permanent - // miss (bogus or pruned snapshot id) is a real error, not replica lag. + // Replica lag: the runner learned this snapshot id from the writer before the + // replica caught up. Serve from the writer; only count/warn if the writer has it + // (a permanent miss is a real error, not lag). try { const result = await query(this.prisma); this.snapshotsSinceReplicaMissCounter.add(1); diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index aa250e91ade..5674b416967 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -889,7 +889,6 @@ describe("RunEngine getSnapshotsSince", () => { expect(result!.length).toBe(expectedSnapshots.length); expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); - // The fallback fired exactly once - this counter is the prod rollout signal. expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(1); } finally { await engine.quit(); @@ -1053,7 +1052,6 @@ describe("RunEngine getSnapshotsSince", () => { expect(result!.map((s) => s.snapshot.id)).not.toContain(tail.id); expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); - // The since snapshot was on the replica - no fallback fired. expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); } finally { await engine.quit(); @@ -1151,7 +1149,6 @@ describe("RunEngine getSnapshotsSince", () => { expect(expectedSnapshots.length).toBeGreaterThan(0); expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); - // No replica read attempted, so no fallback could have fired. expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); } finally { await engine.quit(); @@ -1251,7 +1248,6 @@ describe("RunEngine getSnapshotsSince", () => { expect(expectedSnapshots.length).toBeGreaterThan(0); expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); - // The tx served the read directly - the fallback path never ran. expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); } finally { await engine.quit(); From 01675a8e505186687cdc154b3e0b87e8dc5efc45 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 15:06:31 +0100 Subject: [PATCH 13/21] test(run-engine): replica catches up during jittered retry window (red) Co-Authored-By: Claude Fable 5 --- .../engine/tests/getSnapshotsSince.test.ts | 133 +++++++++++++++++- .../tests/helpers/replicaTestHelpers.ts | 16 ++- 2 files changed, 145 insertions(+), 4 deletions(-) diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index 5674b416967..eb2d253c4ea 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -808,6 +808,9 @@ describe("RunEngine getSnapshotsSince", () => { // primary instead of failing the poll. readOnlyPrisma: schemaOnlyPrisma, readReplicaSnapshotsSinceEnabled: true, + // Tiny jitter window: the replica is permanently empty here, so the retry + // always misses - no need to pay a realistic delay. + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, worker: { redis: redisOptions, workers: 1, @@ -889,7 +892,131 @@ describe("RunEngine getSnapshotsSince", () => { expect(result!.length).toBe(expectedSnapshots.length); expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); - expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(1); + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" }) + ).toBe(1); + // The replica retry never succeeds against a permanently empty replica. + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { + outcome: "replica_retry", + }) + ).toBe(0); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "serves the read from the replica after a jittered retry when it catches up", + async ({ prisma, schemaOnlyPrisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const { meter, getCounterValue } = createTestMetricsMeter(); + + const engine = new RunEngine({ + prisma, + // The schema-only database stands in for a lagging replica: empty when the + // poll first arrives, caught up by the time the jittered retry fires. + readOnlyPrisma: schemaOnlyPrisma, + readReplicaSnapshotsSinceEnabled: true, + // A near-deterministic ~400ms window: long enough to seed the replica + // mid-flight (below), short enough to keep the test fast. + readReplicaSnapshotsSinceRetryDelay: { minMs: 400, maxMs: 401 }, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + meter, + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + const runFriendlyId = generateFriendlyId("run"); + const run = await engine.trigger( + { + number: 1, + friendlyId: runFriendlyId, + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_replica_retry", + spanId: "s_replica_retry", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + await setTimeout(500); + await engine.dequeueFromWorkerQueue({ + consumerId: "test_replica_retry", + workerQueue: "main", + }); + + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThan(1); + + const firstSnapshot = allSnapshots[0]; + + // Kick off the poll against the still-empty replica, then seed the replica + // well before the ~400ms jittered retry fires - simulating the replica + // catching up while the engine waits. + const resultPromise = engine.getSnapshotsSince({ + runId: run.id, + snapshotId: firstSnapshot.id, + }); + await setTimeout(100); + await copySnapshotsToReplica(prisma, schemaOnlyPrisma, run.id); + const result = await resultPromise; + + expect(result).not.toBeNull(); + const expectedSnapshots = allSnapshots.filter( + (s) => s.createdAt.getTime() > firstSnapshot.createdAt.getTime() + ); + expect(expectedSnapshots.length).toBeGreaterThan(0); + expect(result!.length).toBe(expectedSnapshots.length); + expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + + // Recovered on the replica retry - the writer was never consulted. + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { + outcome: "replica_retry", + }) + ).toBe(1); + expect( + await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" }) + ).toBe(0); } finally { await engine.quit(); } @@ -905,6 +1032,7 @@ describe("RunEngine getSnapshotsSince", () => { prisma, readOnlyPrisma: schemaOnlyPrisma, readReplicaSnapshotsSinceEnabled: true, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, worker: { redis: redisOptions, workers: 1, @@ -963,6 +1091,7 @@ describe("RunEngine getSnapshotsSince", () => { // but lags behind the primary by one snapshot (the newest one is excluded below). readOnlyPrisma: schemaOnlyPrisma, readReplicaSnapshotsSinceEnabled: true, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, worker: { redis: redisOptions, workers: 1, @@ -1071,6 +1200,7 @@ describe("RunEngine getSnapshotsSince", () => { // from the primary. readOnlyPrisma: schemaOnlyPrisma, readReplicaSnapshotsSinceEnabled: false, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, worker: { redis: redisOptions, workers: 1, @@ -1169,6 +1299,7 @@ describe("RunEngine getSnapshotsSince", () => { // the counter. readOnlyPrisma: schemaOnlyPrisma, readReplicaSnapshotsSinceEnabled: true, + readReplicaSnapshotsSinceRetryDelay: { minMs: 1, maxMs: 2 }, worker: { redis: redisOptions, workers: 1, diff --git a/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts b/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts index ba9aeb7eb43..3b94bae8887 100644 --- a/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts +++ b/internal-packages/run-engine/src/engine/tests/helpers/replicaTestHelpers.ts @@ -71,18 +71,28 @@ export function createTestMetricsMeter() { const meterProvider = new MeterProvider({ readers: [reader] }); const meter = meterProvider.getMeter("test"); - const getCounterValue = async (name: string): Promise => { + const getCounterValue = async ( + name: string, + attributes?: Record + ): Promise => { await reader.forceFlush(); const resourceMetrics = exporter.getMetrics(); // Cumulative temporality: every export batch carries the full running total, // so read the most recent batch that contains the metric. A counter that was - // never added to exports no data points - treat that as 0. + // never added to exports no data points - treat that as 0. When `attributes` + // is provided, only data points whose attributes match are summed. for (let i = resourceMetrics.length - 1; i >= 0; i--) { for (const scopeMetrics of resourceMetrics[i].scopeMetrics) { for (const metric of scopeMetrics.metrics) { if (metric.descriptor.name === name && metric.dataPoints.length > 0) { - return metric.dataPoints.reduce((sum, dp) => sum + (dp.value as number), 0); + return metric.dataPoints + .filter( + (dp) => + !attributes || + Object.entries(attributes).every(([key, value]) => dp.attributes[key] === value) + ) + .reduce((sum, dp) => sum + (dp.value as number), 0); } } } From b4625be71584e006a85a8c1c617dbd08bafbe770 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 15:09:28 +0100 Subject: [PATCH 14/21] feat(run-engine): jittered replica retry before primary fallback in getSnapshotsSince Co-Authored-By: Claude Fable 5 --- ...napshots-since-replica-primary-fallback.md | 3 +- apps/webapp/app/env.server.ts | 2 + apps/webapp/app/v3/runEngine.server.ts | 4 ++ .../run-engine/src/engine/index.ts | 40 +++++++++++++++++-- .../run-engine/src/engine/types.ts | 4 ++ 5 files changed, 48 insertions(+), 5 deletions(-) diff --git a/.server-changes/snapshots-since-replica-primary-fallback.md b/.server-changes/snapshots-since-replica-primary-fallback.md index 5bae0254b5e..97ef4c1ad53 100644 --- a/.server-changes/snapshots-since-replica-primary-fallback.md +++ b/.server-changes/snapshots-since-replica-primary-fallback.md @@ -5,4 +5,5 @@ type: improvement Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from -(`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is served from the primary instead. +(`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the +replica and served from the primary if it still hasn't caught up. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c55bb424001..d0e7e39471b 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -950,6 +950,8 @@ const EnvironmentSchema = z .default("info"), RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"), RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"), + RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS: z.coerce.number().int().default(50), + RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS: z.coerce.number().int().default(200), RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"), /** How long should the presence ttl last */ diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 7e96c5184b2..e2c8ad85e94 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -22,6 +22,10 @@ function createRunEngine() { env.RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM === "1", readReplicaSnapshotsSinceEnabled: env.RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED === "1", + readReplicaSnapshotsSinceRetryDelay: { + minMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS, + maxMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS, + }, worker: { disabled: env.RUN_ENGINE_WORKER_ENABLED === "0", workers: env.RUN_ENGINE_WORKER_COUNT, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 1f0371733f7..fba2b521921 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -33,6 +33,7 @@ import { import { Worker } from "@trigger.dev/redis-worker"; import { assertNever } from "assert-never"; import { EventEmitter } from "node:events"; +import { setTimeout } from "node:timers/promises"; import { BatchQueue } from "../batch-queue/index.js"; import type { BatchItem, @@ -94,6 +95,7 @@ export class RunEngine { private tracer: Tracer; private meter: Meter; private snapshotsSinceReplicaMissCounter: Counter; + private snapshotsSinceReplicaRetryDelay: { minMs: number; maxMs: number }; private heartbeatTimeouts: HeartbeatTimeouts; private repairSnapshotTimeoutMs: number; private batchQueue: BatchQueue; @@ -282,10 +284,15 @@ export class RunEngine { "run_engine.snapshots_since.replica_miss", { description: - "getSnapshotsSince reads where the since snapshot was not yet on the read replica and was served from the primary", + "getSnapshotsSince reads where the since snapshot was not yet on the read replica, recovered via a replica retry or served from the primary", } ); + this.snapshotsSinceReplicaRetryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { + minMs: 50, + maxMs: 200, + }; + const defaultHeartbeatTimeouts: HeartbeatTimeouts = { PENDING_EXECUTING: 60_000, PENDING_CANCEL: 60_000, @@ -1948,11 +1955,36 @@ export class RunEngine { } catch (e) { if (useReplica && e instanceof ExecutionSnapshotNotFoundError) { // Replica lag: the runner learned this snapshot id from the writer before the - // replica caught up. Serve from the writer; only count/warn if the writer has it - // (a permanent miss is a real error, not lag). + // replica caught up. Give the replica one jittered retry, then serve from the + // writer; only count/warn if a retry succeeds (a permanent miss is a real error, + // not lag). + const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay; + if (maxMs > 0) { + await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs)); + try { + const result = await query(this.readOnlyPrisma); + this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "replica_retry" }); + return result; + } catch (replicaRetryError) { + if (!(replicaRetryError instanceof ExecutionSnapshotNotFoundError)) { + this.logger.error("Failed to getSnapshotsSince", { + message: + replicaRetryError instanceof Error + ? replicaRetryError.message + : replicaRetryError, + runId, + snapshotId, + retriedFromReplica: true, + }); + return null; + } + // still not on the replica - fall through to the primary + } + } + try { const result = await query(this.prisma); - this.snapshotsSinceReplicaMissCounter.add(1); + this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "primary" }); this.logger.warn("getSnapshotsSince: snapshot not yet on replica, served from primary", { runId, snapshotId, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index e63b1c81f8b..60fe69949a8 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -207,6 +207,10 @@ export type RunEngineOptions = { * of the primary. Defaults to false. Callers passing an explicit `tx` always use * that client regardless of this flag. */ readReplicaSnapshotsSinceEnabled?: boolean; + /** Jittered delay bounds for the single replica retry `getSnapshotsSince` performs when + * the since snapshot is not yet on the replica, before falling back to the primary. + * Set maxMs to 0 to skip the replica retry and go straight to the primary. */ + readReplicaSnapshotsSinceRetryDelay?: { minMs: number; maxMs: number }; tracer: Tracer; meter?: Meter; logger?: Logger; From 6673a940d49c1961969d97fa781dec954328107f Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 15:16:04 +0100 Subject: [PATCH 15/21] chore(run-engine): harden retry-window test against cold connects, distinguish fallback failure logs Co-Authored-By: Claude Fable 5 --- internal-packages/run-engine/src/engine/index.ts | 7 +++---- .../run-engine/src/engine/tests/getSnapshotsSince.test.ts | 4 ++++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index fba2b521921..813dedd768c 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1956,8 +1956,7 @@ export class RunEngine { if (useReplica && e instanceof ExecutionSnapshotNotFoundError) { // Replica lag: the runner learned this snapshot id from the writer before the // replica caught up. Give the replica one jittered retry, then serve from the - // writer; only count/warn if a retry succeeds (a permanent miss is a real error, - // not lag). + // writer; a miss on the writer too is a real error, not lag. const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay; if (maxMs > 0) { await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs)); @@ -1974,7 +1973,7 @@ export class RunEngine { : replicaRetryError, runId, snapshotId, - retriedFromReplica: true, + failedDuring: "replica_retry", }); return null; } @@ -1995,7 +1994,7 @@ export class RunEngine { message: retryError instanceof Error ? retryError.message : retryError, runId, snapshotId, - retriedFromReplica: true, + failedDuring: "primary_fallback", }); return null; } diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index eb2d253c4ea..d2b252753e9 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -989,6 +989,10 @@ describe("RunEngine getSnapshotsSince", () => { const firstSnapshot = allSnapshots[0]; + // Warm the replica connection so the engine's first attempt is a fast point + // read - a cold Prisma connect could outlast the 100ms seeding window below. + await schemaOnlyPrisma.$queryRaw`SELECT 1`; + // Kick off the poll against the still-empty replica, then seed the replica // well before the ~400ms jittered retry fires - simulating the replica // catching up while the engine waits. From 1157227844b002f4aa24dda095e9296b47171a74 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 15:20:39 +0100 Subject: [PATCH 16/21] chore: unwrap server-changes notes to one line per paragraph Co-Authored-By: Claude Fable 5 --- .server-changes/snapshots-since-replica-primary-fallback.md | 5 +---- .server-changes/snapshots-since-scoped-to-run.md | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/.server-changes/snapshots-since-replica-primary-fallback.md b/.server-changes/snapshots-since-replica-primary-fallback.md index 97ef4c1ad53..e0589068470 100644 --- a/.server-changes/snapshots-since-replica-primary-fallback.md +++ b/.server-changes/snapshots-since-replica-primary-fallback.md @@ -3,7 +3,4 @@ area: webapp type: improvement --- -Run snapshot polling no longer errors or pays extra latency when the database read replica -hasn't yet replicated the snapshot the runner is polling from -(`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the -replica and served from the primary if it still hasn't caught up. +Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. diff --git a/.server-changes/snapshots-since-scoped-to-run.md b/.server-changes/snapshots-since-scoped-to-run.md index 89f6c406119..0cb6ba7ef4c 100644 --- a/.server-changes/snapshots-since-scoped-to-run.md +++ b/.server-changes/snapshots-since-scoped-to-run.md @@ -3,5 +3,4 @@ area: webapp type: fix --- -Snapshot polling now rejects a since-snapshot id that doesn't belong to the run being polled, -instead of using its timestamp to return a too-wide window of the run's snapshots. +Snapshot polling now rejects a since-snapshot id that doesn't belong to the run being polled, instead of using its timestamp to return a too-wide window of the run's snapshots. From f4dac200d92601457e96eaa02d840f5290670a2c Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 16:15:42 +0100 Subject: [PATCH 17/21] fix(run-engine,testcontainers): address review - normalize retry bounds, tie-stable test assertions, shared clone/drop helpers Co-Authored-By: Claude Fable 5 --- .../run-engine/src/engine/index.ts | 9 ++- .../engine/tests/getSnapshotsSince.test.ts | 30 ++++++-- internal-packages/testcontainers/src/index.ts | 68 +++++++++---------- 3 files changed, 64 insertions(+), 43 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 813dedd768c..62a894fb72a 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -288,9 +288,12 @@ export class RunEngine { } ); - this.snapshotsSinceReplicaRetryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { - minMs: 50, - maxMs: 200, + // Normalize the bounds, but keep maxMs <= 0 meaning "skip the replica retry". + const retryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { minMs: 50, maxMs: 200 }; + const retryMinMs = Math.max(0, retryDelay.minMs); + this.snapshotsSinceReplicaRetryDelay = { + minMs: retryDelay.maxMs > 0 ? Math.min(retryMinMs, retryDelay.maxMs) : retryMinMs, + maxMs: retryDelay.maxMs, }; const defaultHeartbeatTimeouts: HeartbeatTimeouts = { diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index d2b252753e9..f35732c635b 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -890,7 +890,11 @@ describe("RunEngine getSnapshotsSince", () => { ); expect(expectedSnapshots.length).toBeGreaterThan(0); expect(result!.length).toBe(expectedSnapshots.length); - expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); expect( await getCounterValue("run_engine.snapshots_since.replica_miss", { outcome: "primary" }) @@ -1010,7 +1014,11 @@ describe("RunEngine getSnapshotsSince", () => { ); expect(expectedSnapshots.length).toBeGreaterThan(0); expect(result!.length).toBe(expectedSnapshots.length); - expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); // Recovered on the replica retry - the writer was never consulted. expect( @@ -1183,7 +1191,11 @@ describe("RunEngine getSnapshotsSince", () => { (s) => s.createdAt.getTime() > since.createdAt.getTime() && s.id !== tail.id ); expect(result!.map((s) => s.snapshot.id)).not.toContain(tail.id); - expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); } finally { @@ -1281,7 +1293,11 @@ describe("RunEngine getSnapshotsSince", () => { (s) => s.createdAt.getTime() > since.createdAt.getTime() ); expect(expectedSnapshots.length).toBeGreaterThan(0); - expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); } finally { @@ -1381,7 +1397,11 @@ describe("RunEngine getSnapshotsSince", () => { (s) => s.createdAt.getTime() > since.createdAt.getTime() ); expect(expectedSnapshots.length).toBeGreaterThan(0); - expect(result!.map((s) => s.snapshot.id)).toEqual(expectedSnapshots.map((s) => s.id)); + // Compare as sorted lists: same-millisecond snapshots have unspecified relative + // order in both the engine query and this test's query. + expect(result!.map((s) => s.snapshot.id).sort()).toEqual( + expectedSnapshots.map((s) => s.id).sort() + ); expect(await getCounterValue("run_engine.snapshots_since.replica_miss")).toBe(0); } finally { diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index 1950c999bab..0047f996df9 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -181,11 +181,7 @@ const clonedPostgresContainer = async ({}, use: Use) const baseUri = container.getConnectionUri(); const cloneDb = `test_${pgCloneCounter++}`; - const admin = new PrismaClient({ - datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, - }); - await admin.$executeRawUnsafe(`CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"`); - await admin.$disconnect(); + await createDatabaseFromTemplate(baseUri, cloneDb); const cloneUri = postgresUriWithDatabase(baseUri, cloneDb); const view = new Proxy(container, { @@ -200,19 +196,36 @@ const clonedPostgresContainer = async ({}, use: Use) try { await use(view); } finally { - // Best-effort drop so clones don't pile up in the worker's pg over a long suite. WITH (FORCE) - // terminates any lingering backends (pg 13+). A failed drop is harmless - the whole container is - // reaped on worker exit - so we never let cleanup fail the test. - const cleanup = new PrismaClient({ - datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, - }); - try { - await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`); - } catch { - // ignore - reaped with the container anyway - } finally { - await cleanup.$disconnect(); - } + await dropCloneDatabase(baseUri, cloneDb); + } +}; + +const createDatabaseFromTemplate = async (baseUri: string, cloneDb: string) => { + const admin = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, + }); + try { + await admin.$executeRawUnsafe( + `CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"` + ); + } finally { + await admin.$disconnect(); + } +}; + +// Best-effort drop so clones don't pile up in the worker's pg over a long suite. WITH (FORCE) +// terminates any lingering backends (pg 13+). A failed drop is harmless - the whole container is +// reaped on worker exit - so we never let cleanup fail the test. +const dropCloneDatabase = async (baseUri: string, cloneDb: string) => { + const cleanup = new PrismaClient({ + datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, + }); + try { + await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`); + } catch { + // ignore - reaped with the container anyway + } finally { + await cleanup.$disconnect(); } }; @@ -224,11 +237,7 @@ const schemaOnlyPrismaFixture = async ({}: {}, use: Use) => { const baseUri = container.getConnectionUri(); const cloneDb = `schema_only_${pgCloneCounter++}`; - const admin = new PrismaClient({ - datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, - }); - await admin.$executeRawUnsafe(`CREATE DATABASE "${cloneDb}" TEMPLATE "${POSTGRES_TEMPLATE_DB}"`); - await admin.$disconnect(); + await createDatabaseFromTemplate(baseUri, cloneDb); const prisma = new PrismaClient({ datasources: { db: { url: postgresUriWithDatabase(baseUri, cloneDb) } }, @@ -237,18 +246,7 @@ const schemaOnlyPrismaFixture = async ({}: {}, use: Use) => { await use(prisma); } finally { await logCleanup("schemaOnlyPrisma", prisma.$disconnect()); - // Best-effort drop, mirroring clonedPostgresContainer cleanup - the container is reaped on - // worker exit anyway, so never let cleanup fail the test. - const cleanup = new PrismaClient({ - datasources: { db: { url: postgresUriWithDatabase(baseUri, "postgres") } }, - }); - try { - await cleanup.$executeRawUnsafe(`DROP DATABASE IF EXISTS "${cloneDb}" WITH (FORCE)`); - } catch { - // ignore - reaped with the container anyway - } finally { - await cleanup.$disconnect(); - } + await dropCloneDatabase(baseUri, cloneDb); } }; From f26b6465893150b172882ac560c0bedcea45cca2 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 16:43:33 +0100 Subject: [PATCH 18/21] docs(run-engine): document maxMs <= 0 (not just 0) as disabling the replica retry Co-Authored-By: Claude Fable 5 --- internal-packages/run-engine/src/engine/types.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 60fe69949a8..0077478318b 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -209,7 +209,7 @@ export type RunEngineOptions = { readReplicaSnapshotsSinceEnabled?: boolean; /** Jittered delay bounds for the single replica retry `getSnapshotsSince` performs when * the since snapshot is not yet on the replica, before falling back to the primary. - * Set maxMs to 0 to skip the replica retry and go straight to the primary. */ + * Set maxMs to 0 (or any value <= 0) to skip the replica retry and go straight to the primary. */ readReplicaSnapshotsSinceRetryDelay?: { minMs: number; maxMs: number }; tracer: Tracer; meter?: Meter; From 51924bb706990c0b94c3ef0131a618c565eb03fa Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 17:07:35 +0100 Subject: [PATCH 19/21] chore: combine the two snapshots-since server-changes notes into one entry Co-Authored-By: Claude Fable 5 --- .server-changes/snapshots-since-replica-primary-fallback.md | 2 +- .server-changes/snapshots-since-scoped-to-run.md | 6 ------ 2 files changed, 1 insertion(+), 7 deletions(-) delete mode 100644 .server-changes/snapshots-since-scoped-to-run.md diff --git a/.server-changes/snapshots-since-replica-primary-fallback.md b/.server-changes/snapshots-since-replica-primary-fallback.md index e0589068470..9b8257f6410 100644 --- a/.server-changes/snapshots-since-replica-primary-fallback.md +++ b/.server-changes/snapshots-since-replica-primary-fallback.md @@ -3,4 +3,4 @@ area: webapp type: improvement --- -Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. +Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. Polling also now rejects a since-snapshot id that doesn't belong to the run being polled. diff --git a/.server-changes/snapshots-since-scoped-to-run.md b/.server-changes/snapshots-since-scoped-to-run.md deleted file mode 100644 index 0cb6ba7ef4c..00000000000 --- a/.server-changes/snapshots-since-scoped-to-run.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -area: webapp -type: fix ---- - -Snapshot polling now rejects a since-snapshot id that doesn't belong to the run being polled, instead of using its timestamp to return a too-wide window of the run's snapshots. From 3a5e88b927143bca05a77ba769d3954b4ec96c72 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 17:21:51 +0100 Subject: [PATCH 20/21] docs(run-engine): clarify that only not-found replica errors trigger the writer fallback Co-Authored-By: Claude Fable 5 --- internal-packages/run-engine/src/engine/index.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 62a894fb72a..22c21cb2881 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1958,8 +1958,10 @@ export class RunEngine { } catch (e) { if (useReplica && e instanceof ExecutionSnapshotNotFoundError) { // Replica lag: the runner learned this snapshot id from the writer before the - // replica caught up. Give the replica one jittered retry, then serve from the - // writer; a miss on the writer too is a real error, not lag. + // replica caught up. Give the replica one jittered retry; if it's still missing, + // serve from the writer. Only not-found errors get this treatment - any other + // replica failure stays an error rather than shifting read load to the writer. + // A miss on the writer too is a real error, not lag. const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay; if (maxMs > 0) { await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs)); From 8418c7b3f685e495e67eb040ac89f003f6fb82c1 Mon Sep 17 00:00:00 2001 From: Dan Sutton Date: Wed, 10 Jun 2026 17:41:24 +0100 Subject: [PATCH 21/21] chore: retrigger CI Co-Authored-By: Claude Fable 5 --- .../run-engine/src/engine/tests/getSnapshotsSince.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts index f35732c635b..15831d10839 100644 --- a/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts +++ b/internal-packages/run-engine/src/engine/tests/getSnapshotsSince.test.ts @@ -534,7 +534,7 @@ describe("RunEngine getSnapshotsSince", () => { const runASnapshot = runASnapshots[0]; - // Poll run B using a snapshot id that belongs to run A + // Poll run B using a snapshot id that belongs to run A. const result = await engine.getSnapshotsSince({ runId: runB.id, snapshotId: runASnapshot.id,