Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/cancel-stale-delayed-snapshots.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: supervisor
type: fix
---

Cancel pending delayed snapshots when a run completes or disconnects, preventing stale snapshots from pausing microVMs that have moved on to new work.
130 changes: 130 additions & 0 deletions apps/supervisor/src/services/computeSnapshotService.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { describe, expect, it, vi } from "vitest";
import { setTimeout as sleep } from "node:timers/promises";
import { ComputeSnapshotService } from "./computeSnapshotService.js";
import type { ComputeWorkloadManager } from "../workloadManager/compute.js";
import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers";

// The TimerWheel ticks every 100ms, so a 200ms delay dispatches within ~300ms.
const DELAY_MS = 200;
// Long enough that a pending snapshot would certainly have dispatched.
const SETTLE_MS = 600;

function createService() {
const snapshot = vi.fn(async (_opts: { runnerId: string; metadata: Record<string, string> }) => true);

const computeManager = {
snapshotDelayMs: DELAY_MS,
snapshotDispatchLimit: 1,
snapshot,
} as unknown as ComputeWorkloadManager;

const service = new ComputeSnapshotService({
computeManager,
workerClient: {} as SupervisorHttpClient,
wideEventOpts: { service: "supervisor-test", env: {}, enabled: false },
});

return { service, snapshot };
}

function delayedSnapshot(runnerId = "runner-1") {
return {
runnerId,
runFriendlyId: "run_1",
snapshotFriendlyId: "snapshot_1",
};
}

describe("ComputeSnapshotService", () => {
it("dispatches a scheduled snapshot after the delay", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
expect(snapshot).toHaveBeenCalledWith({
runnerId: "runner-1",
metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_1" },
});
} finally {
service.stop();
}
});

it("cancel before the delay expires prevents the dispatch", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());

expect(service.cancel("run_1")).toBe(true);

await sleep(SETTLE_MS);
expect(snapshot).not.toHaveBeenCalled();
} finally {
service.stop();
}
});

it("cancel returns false when nothing is pending", () => {
const { service } = createService();
try {
expect(service.cancel("run_1")).toBe(false);
} finally {
service.stop();
}
});

it("cancel with a matching runnerId cancels the pending snapshot", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot("runner-a"));

expect(service.cancel("run_1", "runner-a")).toBe(true);

await sleep(SETTLE_MS);
expect(snapshot).not.toHaveBeenCalled();
} finally {
service.stop();
}
});

it("cancel with a different runnerId leaves the pending snapshot alone", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot("runner-a"));

// A stale runner for a reassigned run must not cancel the new runner's snapshot.
expect(service.cancel("run_1", "runner-b")).toBe(false);

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
expect(snapshot).toHaveBeenCalledWith(
expect.objectContaining({ runnerId: "runner-a" })
);
} finally {
service.stop();
}
});

it("re-scheduling the same run replaces the pending snapshot", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());
service.schedule("run_1", {
runnerId: "runner-1",
runFriendlyId: "run_1",
snapshotFriendlyId: "snapshot_2",
});

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
await sleep(SETTLE_MS);

expect(snapshot).toHaveBeenCalledTimes(1);
expect(snapshot).toHaveBeenCalledWith({
runnerId: "runner-1",
metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_2" },
});
} finally {
service.stop();
}
});
});
15 changes: 13 additions & 2 deletions apps/supervisor/src/services/computeSnapshotService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,19 @@ export class ComputeSnapshotService {
});
}

/** Cancel a pending delayed snapshot. Returns true if one was cancelled. */
cancel(runFriendlyId: string): boolean {
/**
* Cancel a pending delayed snapshot. Returns true if one was cancelled.
* When `runnerId` is given, only a snapshot scheduled for that same runner
* is cancelled - a stale runner for a run that has since been reassigned
* must not cancel the new runner's pending snapshot.
*/
cancel(runFriendlyId: string, runnerId?: string): boolean {
if (runnerId) {
const pending = this.timerWheel.peek(runFriendlyId);
if (pending && pending.data.runnerId !== runnerId) {
return false;
}
}
const cancelled = this.timerWheel.cancel(runFriendlyId);
if (cancelled) {
emitOneShot({
Expand Down
17 changes: 17 additions & 0 deletions apps/supervisor/src/services/timerWheel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ describe("TimerWheel", () => {
wheel.stop();
});

it("peek returns the pending item without removing it", () => {
const wheel = new TimerWheel<string>({ delayMs: 3000, onExpire: () => {} });

wheel.start();
wheel.submit("run-1", "data");

expect(wheel.peek("run-1")).toEqual({ key: "run-1", data: "data" });
expect(wheel.size).toBe(1);
expect(wheel.peek("run-2")).toBeUndefined();

// Dispatched items are no longer peekable
vi.advanceTimersByTime(3100);
expect(wheel.peek("run-1")).toBeUndefined();

wheel.stop();
});

it("cancel returns false for unknown key", () => {
const wheel = new TimerWheel<string>({
delayMs: 3000,
Expand Down
6 changes: 6 additions & 0 deletions apps/supervisor/src/services/timerWheel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ export class TimerWheel<T> {
return true;
}

/** Look up a pending item without removing it. */
peek(key: string): TimerWheelItem<T> | undefined {
const entry = this.entries.get(key);
return entry ? { key, data: entry.data } : undefined;
}

/** Number of pending items in the wheel. */
get size(): number {
return this.entries.size;
Expand Down
29 changes: 28 additions & 1 deletion apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,25 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
"POST",
async () => {
const { req, reply, params, body } = ctx;
const runnerId = this.runnerIdFromRequest(req);

// A completion attempt invalidates any pending delayed snapshot
// regardless of outcome: the runner has finished executing, so the
// suspended state the snapshot was scheduled to capture no longer
// exists. Cancel BEFORE the async completion call - the timer
// wheel can tick during the await, so cancelling after it leaves
// a real window for a due snapshot to dispatch and pause a VM
// that has moved on. The runnerId guard keeps a stale duplicate
// runner's completion from cancelling a fresh runner's snapshot,
// and the runner can't schedule a new suspend until it receives
// this route's reply, so nothing legitimate can be cancelled here.
this.snapshotService?.cancel(params.runFriendlyId, runnerId);

const completeResponse = await this.workerClient.completeRunAttempt(
params.runFriendlyId,
params.snapshotFriendlyId,
body,
this.runnerIdFromRequest(req)
runnerId
);

if (!completeResponse.success) {
Expand Down Expand Up @@ -728,6 +742,19 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
const runDisconnected = (friendlyId: string, reason: string) => {
socketLogger.debug("runDisconnected", { ...getSocketMetadata() });

// The run is gone from this runner (crash, exit, or replaced by a new
// run), so a pending delayed snapshot for it is stale. Genuine
// waitpoint suspensions keep the socket connected, so this doesn't
// cancel a snapshot that's still wanted; the runnerId match guards
// against a stale duplicate runner cancelling a fresh runner's
// snapshot after the run was reassigned. Caveat: socket.data.runnerId
// is frozen at the websocket handshake, so after a same-supervisor
// restore (new runner id, socket not recreated) this guard refuses
// the cancel - a missed cancel, never a wrong one. The
// attempt.complete cancel uses the runner's current HTTP header id
// and is unaffected.
this.snapshotService?.cancel(friendlyId, socket.data.runnerId);

this.runSockets.delete(friendlyId);
this.emit("runDisconnected", { run: { friendlyId } });
socket.data.runFriendlyId = undefined;
Expand Down