Skip to content

Commit 87fa522

Browse files
committed
fix(webapp): stop locked-version triggers failing on stale replica reads
A locked-version trigger such as triggerAndWait resolved the task's metadata from the read replica and, on a miss, threw a non-retryable "task not found on locked version" even though the task was registered. A read replica can return an empty result for a row that already exists on the primary, so this surfaced as intermittent, self-recovering trigger failures. The locked worker is already resolved on the primary in the same request, so the resolver now re-checks the primary when the replica returns no row, and only reports the task missing when the primary genuinely lacks it. This runs on the cache-miss path only and leaves the hot path unchanged.
1 parent 954ee5c commit 87fa522

3 files changed

Lines changed: 170 additions & 9 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix locked-version triggers such as triggerAndWait occasionally failing with "task not found on locked version" for a task that is actually registered, by confirming against the primary database when the read replica returns no row.

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -268,14 +268,27 @@ export class DefaultQueueManager implements QueueManager {
268268
const cached = await this.taskMetaCache.getByWorker(workerId, slug);
269269
if (cached) return cached;
270270

271-
const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
272-
where: { workerId, runtimeEnvironmentId: environmentId, slug },
273-
select: {
274-
ttl: true,
275-
triggerSource: true,
276-
queue: { select: { id: true, name: true } },
277-
},
278-
});
271+
// Cache miss. Read the row from the replica first; if the replica comes
272+
// back empty, re-check the writer before concluding the task is missing.
273+
// The locked worker itself was just resolved on the writer (see
274+
// triggerTask.server.ts), so a replica that returns no row here is stale,
275+
// not authoritative. Trusting a stale-replica negative throws a
276+
// non-retryable "not found on locked version" for a task that is in fact
277+
// registered. The writer read only runs on this rare miss-then-empty path,
278+
// never on the hot path.
279+
let row = await this.findLockedTaskRow(this.replicaPrisma, workerId, environmentId, slug);
280+
281+
if (!row && this.replicaPrisma !== this.prisma) {
282+
row = await this.findLockedTaskRow(this.prisma, workerId, environmentId, slug);
283+
284+
if (row) {
285+
logger.warn("Locked task metadata missing on replica but found on writer", {
286+
workerId,
287+
environmentId,
288+
slug,
289+
});
290+
}
291+
}
279292

280293
if (!row) return null;
281294

@@ -294,6 +307,22 @@ export class DefaultQueueManager implements QueueManager {
294307
return entry;
295308
}
296309

310+
private findLockedTaskRow(
311+
client: PrismaClientOrTransaction,
312+
workerId: string,
313+
environmentId: string,
314+
slug: string
315+
) {
316+
return client.backgroundWorkerTask.findFirst({
317+
where: { workerId, runtimeEnvironmentId: environmentId, slug },
318+
select: {
319+
ttl: true,
320+
triggerSource: true,
321+
queue: { select: { id: true, name: true } },
322+
},
323+
});
324+
}
325+
297326
/**
298327
* Resolve task metadata for a non-locked trigger. Reads from the
299328
* `task-meta:env:{envId}` Redis hash; falls back to

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import { TaskRun } from "@trigger.dev/database";
2323
import { Redis } from "ioredis";
2424
import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server";
2525
import { DefaultQueueManager } from "~/runEngine/concerns/queues.server";
26-
import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server";
26+
import {
27+
NoopTaskMetadataCache,
28+
RedisTaskMetadataCache,
29+
} from "~/services/taskMetadataCache.server";
2730
import {
2831
EntitlementValidationParams,
2932
MaxAttemptsValidationParams,
@@ -949,6 +952,129 @@ describe("RunEngineTriggerTaskService", () => {
949952
}
950953
);
951954

955+
containerTest(
956+
"should fall back to the writer when a stale replica returns no row for a locked task",
957+
async ({ prisma, redisOptions }) => {
958+
const engine = new RunEngine({
959+
prisma,
960+
worker: {
961+
redis: redisOptions,
962+
workers: 1,
963+
tasksPerWorker: 10,
964+
pollIntervalMs: 100,
965+
},
966+
queue: {
967+
redis: redisOptions,
968+
},
969+
runLock: {
970+
redis: redisOptions,
971+
},
972+
machines: {
973+
defaultMachine: "small-1x",
974+
machines: {
975+
"small-1x": {
976+
name: "small-1x" as const,
977+
cpu: 0.5,
978+
memory: 0.5,
979+
centsPerMs: 0.0001,
980+
},
981+
},
982+
baseCostInCents: 0.0005,
983+
},
984+
tracer: trace.getTracer("test", "0.0.0"),
985+
});
986+
987+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
988+
const taskIdentifier = "test-task";
989+
990+
const worker = await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
991+
992+
// A read replica that has not yet caught up to the BackgroundWorkerTask
993+
// row: it is the real database for every query except the locked-task
994+
// lookup, which comes back empty (the TRI-10868 false-negative window).
995+
const staleReplica = new Proxy(prisma, {
996+
get(target, prop, receiver) {
997+
if (prop === "backgroundWorkerTask") {
998+
const delegate = Reflect.get(target, prop, receiver);
999+
return new Proxy(delegate, {
1000+
get(taskTarget, taskProp, taskReceiver) {
1001+
if (taskProp === "findFirst") {
1002+
return async () => null;
1003+
}
1004+
const value = Reflect.get(taskTarget, taskProp, taskReceiver);
1005+
return typeof value === "function" ? value.bind(taskTarget) : value;
1006+
},
1007+
});
1008+
}
1009+
const value = Reflect.get(target, prop, receiver);
1010+
return typeof value === "function" ? value.bind(target) : value;
1011+
},
1012+
}) as typeof prisma;
1013+
1014+
// Noop cache so every resolve misses the cache and exercises the
1015+
// replica -> writer fallback. The writer is the real `prisma`.
1016+
const queuesManager = new DefaultQueueManager(
1017+
prisma,
1018+
engine,
1019+
staleReplica,
1020+
new NoopTaskMetadataCache()
1021+
);
1022+
1023+
const triggerTaskService = new RunEngineTriggerTaskService({
1024+
engine,
1025+
prisma,
1026+
payloadProcessor: new MockPayloadProcessor(),
1027+
queueConcern: queuesManager,
1028+
idempotencyKeyConcern: new IdempotencyKeyConcern(
1029+
prisma,
1030+
engine,
1031+
new MockTraceEventConcern()
1032+
),
1033+
validator: new MockTriggerTaskValidator(),
1034+
traceEventConcern: new MockTraceEventConcern(),
1035+
tracer: trace.getTracer("test", "0.0.0"),
1036+
metadataMaximumSize: 1024 * 1024 * 1,
1037+
});
1038+
1039+
// The task IS registered on the locked worker, but the replica returns
1040+
// nothing. Before the fix this threw "not found on locked version"; now
1041+
// the writer fallback resolves the registered row.
1042+
const result = await triggerTaskService.call({
1043+
taskId: taskIdentifier,
1044+
environment: authenticatedEnvironment,
1045+
body: {
1046+
payload: { test: "test" },
1047+
options: {
1048+
lockToVersion: worker.worker.version,
1049+
},
1050+
},
1051+
});
1052+
1053+
expect(result).toBeDefined();
1054+
expect(result?.run.status).toBe("PENDING");
1055+
expect(result?.run.queue).toBe(`task/${taskIdentifier}`);
1056+
1057+
// A genuinely unregistered task must still throw, even with the writer
1058+
// fallback — the writer has no row either, so the 422 is correct.
1059+
await expect(
1060+
triggerTaskService.call({
1061+
taskId: "not-a-registered-task",
1062+
environment: authenticatedEnvironment,
1063+
body: {
1064+
payload: { test: "test" },
1065+
options: {
1066+
lockToVersion: worker.worker.version,
1067+
},
1068+
},
1069+
})
1070+
).rejects.toThrow(
1071+
`Task 'not-a-registered-task' not found on locked version '${worker.worker.version}'`
1072+
);
1073+
1074+
await engine.quit();
1075+
}
1076+
);
1077+
9521078
containerTest(
9531079
"should preserve runFriendlyId across retries when RunDuplicateIdempotencyKeyError is thrown",
9541080
async ({ prisma, redisOptions }) => {

0 commit comments

Comments
 (0)