Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
d5b8cb8
refactor(run-engine): split completeWaitpoint into tx-able mutation a…
d-cs Jun 10, 2026
67f70bc
refactor(run-engine): carry output through CompletedWaitpointMutation…
d-cs Jun 10, 2026
d3bfa0c
test(run-engine): add failing tests for atomic single-commit run comp…
d-cs Jun 10, 2026
a8bd676
test(run-engine): make atomic-completion assertions race-free (xmin e…
d-cs Jun 10, 2026
9372ab6
feat(run-engine): complete run and waitpoint in a single transaction
d-cs Jun 10, 2026
5694397
feat(run-engine): unblock continued runs in a single transaction
d-cs Jun 10, 2026
b4ce360
fix(run-engine): keep snapshot side effects out of the unblock transa…
d-cs Jun 10, 2026
eee5d58
fix(run-engine): restore pre-split snapshot return and event payload …
d-cs Jun 10, 2026
9e1d3a6
fix(run-engine): widen unblock transaction limits for batch-scale wai…
d-cs Jun 10, 2026
9cffdce
chore: add server-changes entry for transactional run completion
d-cs Jun 10, 2026
36c6a3f
fix(run-engine): log snapshot ids instead of full objects on the unbl…
d-cs Jun 10, 2026
f9974dd
Revert "fix(run-engine): log snapshot ids instead of full objects on …
d-cs Jun 10, 2026
3a89556
docs(run-engine): explain why the SUSPENDED unblock path stays non-tr…
d-cs Jun 10, 2026
485147d
chore: drop implementation plan doc from the branch
d-cs Jun 10, 2026
92277a5
docs(run-engine): trim narration and history references from comments
d-cs Jun 10, 2026
fc20dc2
fix(run-engine): move snapshot side effects and ttl ack out of the st…
d-cs Jun 10, 2026
06abd0a
Merge branch 'main' into run-engine-transactional-completion
d-cs Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/transactional-run-completion.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Reduce database load during traffic spikes by completing runs and resuming waiting runs in single atomic transactions
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ export class ExecutionSnapshotSystem {
this.heartbeatTimeouts = options.heartbeatTimeouts;
}

public async createExecutionSnapshot(
/**
* Pure Postgres mutation — safe inside a transaction. Direct callers MUST call
* `scheduleSnapshotSideEffects()` with the result after the surrounding transaction commits.
*/
public async createExecutionSnapshotMutation(
prisma: PrismaClientOrTransaction,
{
run,
Expand Down Expand Up @@ -399,35 +403,93 @@ export class ExecutionSnapshotSystem {
},
});

return {
...newSnapshot,
friendlyId: SnapshotId.toFriendlyId(newSnapshot.id),
runFriendlyId: RunId.toFriendlyId(newSnapshot.runId),
};
}

/**
* Post-commit side effects for a newly created snapshot: arms the heartbeat job (if
* the execution status requires one) and emits the `executionSnapshotCreated` event.
* Never call this inside a transaction — these side effects must only run against a
* durable (committed) snapshot row.
*/
public async scheduleSnapshotSideEffects({
snapshot,
runId,
error,
completedWaitpoints,
}: {
snapshot: Awaited<ReturnType<ExecutionSnapshotSystem["createExecutionSnapshotMutation"]>>;
runId: string;
error?: string;
completedWaitpoints?: { id: string; index?: number }[];
}) {
if (!error) {
//set heartbeat (if relevant)
const intervalMs = this.#getHeartbeatIntervalMs(newSnapshot.executionStatus);
const intervalMs = this.#getHeartbeatIntervalMs(snapshot.executionStatus);
if (intervalMs !== null) {
await this.$.worker.enqueue({
id: `heartbeatSnapshot.${run.id}`,
id: `heartbeatSnapshot.${runId}`,
job: "heartbeatSnapshot",
payload: { snapshotId: newSnapshot.id, runId: run.id },
payload: { snapshotId: snapshot.id, runId },
availableAt: new Date(Date.now() + intervalMs),
});
}
}

// The emitted payload must contain only raw snapshot row fields (+ completedWaitpointIds).
const { friendlyId: _fid, runFriendlyId: _rfid, ...snapshotRow } = snapshot;

this.$.eventBus.emit("executionSnapshotCreated", {
time: newSnapshot.createdAt,
time: snapshot.createdAt,
run: {
id: newSnapshot.runId,
id: snapshot.runId,
},
snapshot: {
...newSnapshot,
...snapshotRow,
completedWaitpointIds: completedWaitpoints?.map((w) => w.id) ?? [],
},
});
}

return {
...newSnapshot,
friendlyId: SnapshotId.toFriendlyId(newSnapshot.id),
runFriendlyId: RunId.toFriendlyId(newSnapshot.runId),
};
public async createExecutionSnapshot(
prisma: PrismaClientOrTransaction,
args: {
run: { id: string; status: TaskRunStatus; attemptNumber?: number | null };
snapshot: {
executionStatus: TaskRunExecutionStatus;
description: string;
metadata?: Prisma.JsonValue;
};
previousSnapshotId?: string;
batchId?: string;
environmentId: string;
environmentType: RuntimeEnvironmentType;
projectId: string;
organizationId: string;
checkpointId?: string;
workerId?: string;
runnerId?: string;
completedWaitpoints?: {
id: string;
index?: number;
}[];
error?: string;
}
) {
const newSnapshot = await this.createExecutionSnapshotMutation(prisma, args);

await this.scheduleSnapshotSideEffects({
snapshot: newSnapshot,
runId: args.run.id,
error: args.error,
completedWaitpoints: args.completedWaitpoints,
});

return newSnapshot;
}

public async heartbeatRun({
Expand Down
205 changes: 127 additions & 78 deletions internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
Comment thread
d-cs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -459,29 +459,27 @@ export class RunAttemptSystem {
},
});

const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(tx, {
run,
snapshot: {
executionStatus: "EXECUTING",
description: `Attempt created, starting execution${
isWarmStart ? " (warm start)" : ""
}`,
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
batchId: latestSnapshot.batchId ?? undefined,
completedWaitpoints: latestSnapshot.completedWaitpoints,
workerId,
runnerId,
});

if (taskRun.ttl) {
//don't expire the run, it's going to execute
await this.$.worker.ack(`expireRun:${taskRun.id}`);
}
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshotMutation(
tx,
{
run,
snapshot: {
executionStatus: "EXECUTING",
description: `Attempt created, starting execution${
isWarmStart ? " (warm start)" : ""
}`,
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
batchId: latestSnapshot.batchId ?? undefined,
completedWaitpoints: latestSnapshot.completedWaitpoints,
workerId,
runnerId,
}
);

return { updatedRun: run, snapshot: newSnapshot };
},
Expand Down Expand Up @@ -510,6 +508,18 @@ export class RunAttemptSystem {

const { updatedRun, snapshot } = result;

if (taskRun.ttl) {
//don't expire the run, it's going to execute
await this.$.worker.ack(`expireRun:${taskRun.id}`);
}

// Side effects must only run against a durably committed snapshot row.
await this.executionSnapshotSystem.scheduleSnapshotSideEffects({
snapshot,
runId: taskRun.id,
completedWaitpoints: latestSnapshot.completedWaitpoints,
});

this.$.eventBus.emit("runAttemptStarted", {
time: new Date(),
run: {
Expand Down Expand Up @@ -688,6 +698,9 @@ export class RunAttemptSystem {
runId: string;
snapshotId: string;
completion: TaskRunSuccessfulExecutionResult;
// Note: passing an open transaction as `tx` makes $transaction run inline in it,
// which would move this method's post-commit side effects inside the caller's
// transaction. Callers must pass a plain client.
tx: PrismaClientOrTransaction;
workerId?: string;
runnerId?: string;
Expand Down Expand Up @@ -738,58 +751,100 @@ export class RunAttemptSystem {
environmentType: latestSnapshot.environmentType,
});

const run = await prisma.taskRun.update({
where: { id: runId },
data: {
status: "COMPLETED_SUCCESSFULLY",
completedAt,
output: completion.output,
outputType: completion.outputType,
usageDurationMs: updatedUsage.usageDurationMs,
costInCents: updatedUsage.costInCents,
executionSnapshots: {
create: {
executionStatus: "FINISHED",
description: "Task completed successfully",
runStatus: "COMPLETED_SUCCESSFULLY",
attemptNumber: latestSnapshot.attemptNumber,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
const completedOutput = completion.output
? { value: completion.output, type: completion.outputType, isError: false as const }
: undefined;

const txResult = await $transaction(
prisma,
async (tx) => {
const run = await tx.taskRun.update({
where: { id: runId },
data: {
status: "COMPLETED_SUCCESSFULLY",
completedAt,
output: completion.output,
outputType: completion.outputType,
usageDurationMs: updatedUsage.usageDurationMs,
costInCents: updatedUsage.costInCents,
executionSnapshots: {
create: {
executionStatus: "FINISHED",
description: "Task completed successfully",
runStatus: "COMPLETED_SUCCESSFULLY",
attemptNumber: latestSnapshot.attemptNumber,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
},
},
},
},
},
select: {
id: true,
friendlyId: true,
status: true,
attemptNumber: true,
spanId: true,
updatedAt: true,
associatedWaitpoint: {
select: {
id: true,
friendlyId: true,
status: true,
attemptNumber: true,
spanId: true,
updatedAt: true,
associatedWaitpoint: {
select: {
id: true,
},
},
project: {
select: {
organizationId: true,
},
},
batchId: true,
createdAt: true,
completedAt: true,
taskEventStore: true,
parentTaskRunId: true,
usageDurationMs: true,
costInCents: true,
runtimeEnvironmentId: true,
projectId: true,
},
},
project: {
select: {
organizationId: true,
},
},
batchId: true,
createdAt: true,
completedAt: true,
taskEventStore: true,
parentTaskRunId: true,
usageDurationMs: true,
costInCents: true,
runtimeEnvironmentId: true,
projectId: true,
});

// Complete the waitpoint if it exists (runs without waiting parents
// have no waitpoint). Side effects (continuation jobs, events) are
// scheduled after this transaction commits.
const completedWaitpoint = run.associatedWaitpoint
? await this.waitpointSystem.completeWaitpointMutation({
id: run.associatedWaitpoint.id,
output: completedOutput,
tx,
})
: undefined;

return { run, completedWaitpoint };
},
});
(error) => {
this.$.logger.error("RunEngine.attemptSucceeded(): prisma.$transaction error", {
code: error.code,
meta: error.meta,
stack: error.stack,
message: error.message,
name: error.name,
});
throw new ServiceValidationError(
"Failed to complete task run and associated waitpoint",
500
);
}
);

if (!txResult) {
throw new ServiceValidationError("Failed to complete task run attempt", 500);
}

const { run, completedWaitpoint } = txResult;

const newSnapshot = await getLatestExecutionSnapshot(prisma, runId);

await this.$.runQueue.acknowledgeMessage(run.project.organizationId, runId);
Expand All @@ -806,14 +861,8 @@ export class RunAttemptSystem {
},
});

// Complete the waitpoint if it exists (runs without waiting parents have no waitpoint)
if (run.associatedWaitpoint) {
await this.waitpointSystem.completeWaitpoint({
id: run.associatedWaitpoint.id,
output: completion.output
? { value: completion.output, type: completion.outputType, isError: false }
: undefined,
});
if (completedWaitpoint) {
await this.waitpointSystem.scheduleWaitpointContinuations(completedWaitpoint);
}

this.$.eventBus.emit("runSucceeded", {
Expand Down
Loading
Loading