From 2f5b8db3379f264063f0fd32e7c15cdc4a072952 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Wed, 10 Jun 2026 18:01:42 +0200 Subject: [PATCH 1/4] fix: cancel pending delayed snapshots when the run completes or disconnects The compute suspend flow delays snapshots by snapshotDelayMs to avoid wasted work on short-lived waitpoints, with the intent that a run which continues before the delay expires cancels the pending snapshot. But the only cancel() call site is the /continue workload action, which runners only invoke when restoring from an already-taken snapshot - so a pending snapshot is never actually cancelled (zero snapshot.canceled events in prod). When a run resumes and completes within the delay window, the stale snapshot fires anyway and fcrun pauses the VM for ~6-13s while its controller is mid warm-start long-poll. The frozen guest can't fire its abort timer or send a FIN, so firestarter keeps the connection claimable past the client deadline and dispatches runs into it - each one a ~300s stall (TRI-10293). Cancel the pending snapshot when the attempt completes and when the run socket disconnects. Genuine waitpoint suspensions keep the runner socket connected and the attempt incomplete, so neither hook cancels a snapshot that is still wanted. Cancellation is guarded by runnerId so a stale duplicate runner for a reassigned run can't cancel the new runner's pending snapshot. --- .../services/computeSnapshotService.test.ts | 130 ++++++++++++++++++ .../src/services/computeSnapshotService.ts | 15 +- .../src/services/timerWheel.test.ts | 17 +++ apps/supervisor/src/services/timerWheel.ts | 6 + apps/supervisor/src/workloadServer/index.ts | 23 ++++ 5 files changed, 189 insertions(+), 2 deletions(-) create mode 100644 apps/supervisor/src/services/computeSnapshotService.test.ts diff --git a/apps/supervisor/src/services/computeSnapshotService.test.ts b/apps/supervisor/src/services/computeSnapshotService.test.ts new file mode 100644 index 00000000000..b039b63bd4d --- /dev/null +++ b/apps/supervisor/src/services/computeSnapshotService.test.ts @@ -0,0 +1,130 @@ +import { describe, expect, it, vi } from "vitest"; +import { setTimeout as sleep } from "node:timers/promises"; +import { ComputeSnapshotService } from "./computeSnapshotService.js"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; + +// The TimerWheel ticks every 100ms, so a 200ms delay dispatches within ~300ms. +const DELAY_MS = 200; +// Long enough that a pending snapshot would certainly have dispatched. +const SETTLE_MS = 600; + +function createService() { + const snapshot = vi.fn(async (_opts: { runnerId: string; metadata: Record }) => true); + + const computeManager = { + snapshotDelayMs: DELAY_MS, + snapshotDispatchLimit: 1, + snapshot, + } as unknown as ComputeWorkloadManager; + + const service = new ComputeSnapshotService({ + computeManager, + workerClient: {} as SupervisorHttpClient, + wideEventOpts: { service: "supervisor-test", env: {}, enabled: false }, + }); + + return { service, snapshot }; +} + +function delayedSnapshot(runnerId = "runner-1") { + return { + runnerId, + runFriendlyId: "run_1", + snapshotFriendlyId: "snapshot_1", + }; +} + +describe("ComputeSnapshotService", () => { + it("dispatches a scheduled snapshot after the delay", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + expect(snapshot).toHaveBeenCalledWith({ + runnerId: "runner-1", + metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_1" }, + }); + } finally { + service.stop(); + } + }); + + it("cancel before the delay expires prevents the dispatch", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + + expect(service.cancel("run_1")).toBe(true); + + await sleep(SETTLE_MS); + expect(snapshot).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel returns false when nothing is pending", () => { + const { service } = createService(); + try { + expect(service.cancel("run_1")).toBe(false); + } finally { + service.stop(); + } + }); + + it("cancel with a matching runnerId cancels the pending snapshot", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot("runner-a")); + + expect(service.cancel("run_1", "runner-a")).toBe(true); + + await sleep(SETTLE_MS); + expect(snapshot).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel with a different runnerId leaves the pending snapshot alone", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot("runner-a")); + + // A stale runner for a reassigned run must not cancel the new runner's snapshot. + expect(service.cancel("run_1", "runner-b")).toBe(false); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + expect(snapshot).toHaveBeenCalledWith( + expect.objectContaining({ runnerId: "runner-a" }) + ); + } finally { + service.stop(); + } + }); + + it("re-scheduling the same run replaces the pending snapshot", async () => { + const { service, snapshot } = createService(); + try { + service.schedule("run_1", delayedSnapshot()); + service.schedule("run_1", { + runnerId: "runner-1", + runFriendlyId: "run_1", + snapshotFriendlyId: "snapshot_2", + }); + + await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 }); + await sleep(SETTLE_MS); + + expect(snapshot).toHaveBeenCalledTimes(1); + expect(snapshot).toHaveBeenCalledWith({ + runnerId: "runner-1", + metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_2" }, + }); + } finally { + service.stop(); + } + }); +}); diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts index 35ac6acecab..216753fc12d 100644 --- a/apps/supervisor/src/services/computeSnapshotService.ts +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -92,8 +92,19 @@ export class ComputeSnapshotService { }); } - /** Cancel a pending delayed snapshot. Returns true if one was cancelled. */ - cancel(runFriendlyId: string): boolean { + /** + * Cancel a pending delayed snapshot. Returns true if one was cancelled. + * When `runnerId` is given, only a snapshot scheduled for that same runner + * is cancelled - a stale runner for a run that has since been reassigned + * must not cancel the new runner's pending snapshot. + */ + cancel(runFriendlyId: string, runnerId?: string): boolean { + if (runnerId) { + const pending = this.timerWheel.peek(runFriendlyId); + if (pending && pending.data.runnerId !== runnerId) { + return false; + } + } const cancelled = this.timerWheel.cancel(runFriendlyId); if (cancelled) { emitOneShot({ diff --git a/apps/supervisor/src/services/timerWheel.test.ts b/apps/supervisor/src/services/timerWheel.test.ts index 3f6bb9aa19b..e685a26b1b4 100644 --- a/apps/supervisor/src/services/timerWheel.test.ts +++ b/apps/supervisor/src/services/timerWheel.test.ts @@ -51,6 +51,23 @@ describe("TimerWheel", () => { wheel.stop(); }); + it("peek returns the pending item without removing it", () => { + const wheel = new TimerWheel({ delayMs: 3000, onExpire: () => {} }); + + wheel.start(); + wheel.submit("run-1", "data"); + + expect(wheel.peek("run-1")).toEqual({ key: "run-1", data: "data" }); + expect(wheel.size).toBe(1); + expect(wheel.peek("run-2")).toBeUndefined(); + + // Dispatched items are no longer peekable + vi.advanceTimersByTime(3100); + expect(wheel.peek("run-1")).toBeUndefined(); + + wheel.stop(); + }); + it("cancel returns false for unknown key", () => { const wheel = new TimerWheel({ delayMs: 3000, diff --git a/apps/supervisor/src/services/timerWheel.ts b/apps/supervisor/src/services/timerWheel.ts index 9584423824d..cab5a5d7a25 100644 --- a/apps/supervisor/src/services/timerWheel.ts +++ b/apps/supervisor/src/services/timerWheel.ts @@ -121,6 +121,12 @@ export class TimerWheel { return true; } + /** Look up a pending item without removing it. */ + peek(key: string): TimerWheelItem | undefined { + const entry = this.entries.get(key); + return entry ? { key, data: entry.data } : undefined; + } + /** Number of pending items in the wheel. */ get size(): number { return this.entries.size; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index ba933477976..979034058d0 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -303,6 +303,16 @@ export class WorkloadServer extends EventEmitter { return; } + // A completed attempt invalidates any pending delayed snapshot: the + // suspended execution state it was scheduled to capture no longer + // exists. Without this, the snapshot fires up to snapshotDelayMs + // later and pauses a VM that has long moved on, e.g. mid warm-start + // long-poll or already executing the next run. + this.snapshotService?.cancel( + params.runFriendlyId, + this.runnerIdFromRequest(req) + ); + reply.json( completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody ); @@ -728,6 +738,19 @@ export class WorkloadServer extends EventEmitter { const runDisconnected = (friendlyId: string, reason: string) => { socketLogger.debug("runDisconnected", { ...getSocketMetadata() }); + // The run is gone from this runner (crash, exit, or replaced by a new + // run), so a pending delayed snapshot for it is stale. Genuine + // waitpoint suspensions keep the socket connected, so this doesn't + // cancel a snapshot that's still wanted; the runnerId match guards + // against a stale duplicate runner cancelling a fresh runner's + // snapshot after the run was reassigned. Caveat: socket.data.runnerId + // is frozen at the websocket handshake, so after a same-supervisor + // restore (new runner id, socket not recreated) this guard refuses + // the cancel - a missed cancel, never a wrong one. The + // attempt.complete cancel uses the runner's current HTTP header id + // and is unaffected. + this.snapshotService?.cancel(friendlyId, socket.data.runnerId); + this.runSockets.delete(friendlyId); this.emit("runDisconnected", { run: { friendlyId } }); socket.data.runFriendlyId = undefined; From e7a6e76d8d1a702eaeea26c7bfbe5dae84f646f7 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Thu, 11 Jun 2026 16:40:34 +0200 Subject: [PATCH 2/4] fix: cancel pending delayed snapshots regardless of completion outcome A runner calling attempts/complete has finished executing either way - on a transient completion failure it retries the call, so cancelling only on success leaves the stale snapshot armed in the meantime. The runnerId guard already covers the stale-duplicate-runner case whose completion fails server-side validation. Addresses CodeRabbit review feedback on the PR. --- apps/supervisor/src/workloadServer/index.ts | 24 +++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 979034058d0..6c76810e2ef 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -287,13 +287,25 @@ export class WorkloadServer extends EventEmitter { "POST", async () => { const { req, reply, params, body } = ctx; + const runnerId = this.runnerIdFromRequest(req); const completeResponse = await this.workerClient.completeRunAttempt( params.runFriendlyId, params.snapshotFriendlyId, body, - this.runnerIdFromRequest(req) + runnerId ); + // A completion attempt invalidates any pending delayed snapshot + // regardless of outcome: the runner has finished executing, so the + // suspended state the snapshot was scheduled to capture no longer + // exists. Without this, the snapshot fires up to snapshotDelayMs + // later and pauses a VM that has long moved on - and on a transient + // completion failure the runner retries, so waiting for success + // would leave the stale snapshot armed in the meantime. The + // runnerId guard keeps a stale duplicate runner's failed completion + // from cancelling a fresh runner's snapshot. + this.snapshotService?.cancel(params.runFriendlyId, runnerId); + if (!completeResponse.success) { this.logger.error("Failed to complete run", { params, @@ -303,16 +315,6 @@ export class WorkloadServer extends EventEmitter { return; } - // A completed attempt invalidates any pending delayed snapshot: the - // suspended execution state it was scheduled to capture no longer - // exists. Without this, the snapshot fires up to snapshotDelayMs - // later and pauses a VM that has long moved on, e.g. mid warm-start - // long-poll or already executing the next run. - this.snapshotService?.cancel( - params.runFriendlyId, - this.runnerIdFromRequest(req) - ); - reply.json( completeResponse.data satisfies WorkloadRunAttemptCompleteResponseBody ); From 3b4c9db4de0ee10b3b4a4e11de1c425e5fe9a320 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Thu, 11 Jun 2026 17:04:59 +0200 Subject: [PATCH 3/4] chore: add server-changes entry --- .server-changes/cancel-stale-delayed-snapshots.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .server-changes/cancel-stale-delayed-snapshots.md diff --git a/.server-changes/cancel-stale-delayed-snapshots.md b/.server-changes/cancel-stale-delayed-snapshots.md new file mode 100644 index 00000000000..9a167c613b1 --- /dev/null +++ b/.server-changes/cancel-stale-delayed-snapshots.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: fix +--- + +Cancel pending delayed snapshots when a run completes or disconnects, preventing stale snapshots from pausing microVMs that have moved on to new work. From 60d270fdcb106dd07e1d22565419f5a9c5c090d5 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Thu, 11 Jun 2026 17:25:28 +0200 Subject: [PATCH 4/4] fix: cancel pending delayed snapshot before the async completion call The timer wheel can tick during the awaited completeRunAttempt HTTP round trip, so cancelling after it leaves a window where a due snapshot dispatches anyway and pauses a VM that has moved on. Cancel first - the runnerId guard is ordering-independent, and the runner can't schedule a new suspend until it receives this route's reply, so nothing legitimate can be cancelled early. Matches the existing /continue ordering. Addresses Devin review feedback. --- apps/supervisor/src/workloadServer/index.ts | 24 +++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 6c76810e2ef..bf4b38012d5 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -288,6 +288,19 @@ export class WorkloadServer extends EventEmitter { async () => { const { req, reply, params, body } = ctx; const runnerId = this.runnerIdFromRequest(req); + + // A completion attempt invalidates any pending delayed snapshot + // regardless of outcome: the runner has finished executing, so the + // suspended state the snapshot was scheduled to capture no longer + // exists. Cancel BEFORE the async completion call - the timer + // wheel can tick during the await, so cancelling after it leaves + // a real window for a due snapshot to dispatch and pause a VM + // that has moved on. The runnerId guard keeps a stale duplicate + // runner's completion from cancelling a fresh runner's snapshot, + // and the runner can't schedule a new suspend until it receives + // this route's reply, so nothing legitimate can be cancelled here. + this.snapshotService?.cancel(params.runFriendlyId, runnerId); + const completeResponse = await this.workerClient.completeRunAttempt( params.runFriendlyId, params.snapshotFriendlyId, @@ -295,17 +308,6 @@ export class WorkloadServer extends EventEmitter { runnerId ); - // A completion attempt invalidates any pending delayed snapshot - // regardless of outcome: the runner has finished executing, so the - // suspended state the snapshot was scheduled to capture no longer - // exists. Without this, the snapshot fires up to snapshotDelayMs - // later and pauses a VM that has long moved on - and on a transient - // completion failure the runner retries, so waiting for success - // would leave the stale snapshot armed in the meantime. The - // runnerId guard keeps a stale duplicate runner's failed completion - // from cancelling a fresh runner's snapshot. - this.snapshotService?.cancel(params.runFriendlyId, runnerId); - if (!completeResponse.success) { this.logger.error("Failed to complete run", { params,