Skip to content

Commit a9810b0

Browse files
authored
Stop heartbeats when creating andWait checkpoints, fix for concurrency tracker (#1268)
* RESUME messages were missing some data that is needed for the concurrency tracker * Cancel heartbeats when checkpoints are created for triggerAndWait and batchTriggerAndWait * Added some logs when canceling heartbeats * Improved the nested dependencies test task
1 parent f9ec66c commit a9810b0

5 files changed

Lines changed: 80 additions & 25 deletions

File tree

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,27 @@ export class MarQS {
521521
);
522522
}
523523

524+
public async cancelHeartbeat(messageId: string) {
525+
return this.#trace(
526+
"cancelHeartbeat",
527+
async (span) => {
528+
span.setAttributes({
529+
[SemanticAttributes.MESSAGE_ID]: messageId,
530+
});
531+
532+
await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);
533+
},
534+
{
535+
kind: SpanKind.CONSUMER,
536+
attributes: {
537+
[SEMATTRS_MESSAGING_OPERATION]: "cancelHeartbeat",
538+
[SEMATTRS_MESSAGE_ID]: messageId,
539+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
540+
},
541+
}
542+
);
543+
}
544+
524545
async #trace<T>(
525546
name: string,
526547
fn: (span: Span) => Promise<T>,

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { logger } from "~/services/logger.server";
55
import { marqs } from "~/v3/marqs/index.server";
66
import { generateFriendlyId } from "../friendlyIdentifiers";
77
import {
8-
FINAL_ATTEMPT_STATUSES,
98
isFinalAttemptStatus,
109
isFinalRunStatus,
1110
isFreezableAttemptStatus,
@@ -135,14 +134,32 @@ export class CreateCheckpointService extends BaseService {
135134
const { reason } = params;
136135

137136
let checkpointEvent: CheckpointRestoreEvent | undefined;
138-
let keepRunAlive = false;
139137

140138
switch (reason.type) {
141139
case "WAIT_FOR_DURATION": {
142140
checkpointEvent = await eventService.checkpoint({
143141
checkpointId: checkpoint.id,
144142
});
145143

144+
if (checkpointEvent) {
145+
await marqs?.replaceMessage(
146+
attempt.taskRunId,
147+
{
148+
type: "RESUME_AFTER_DURATION",
149+
resumableAttemptId: attempt.id,
150+
checkpointEventId: checkpointEvent.id,
151+
},
152+
reason.now + reason.ms
153+
);
154+
155+
return {
156+
success: true,
157+
checkpoint,
158+
event: checkpointEvent,
159+
keepRunAlive: false,
160+
};
161+
}
162+
146163
break;
147164
}
148165
case "WAIT_FOR_TASK": {
@@ -152,6 +169,14 @@ export class CreateCheckpointService extends BaseService {
152169
});
153170

154171
if (checkpointEvent) {
172+
//heartbeats will start again when the run resumes
173+
logger.log("CreateCheckpointService: Canceling heartbeat", {
174+
attemptId: attempt.id,
175+
taskRunId: attempt.taskRunId,
176+
type: "WAIT_FOR_TASK",
177+
});
178+
await marqs?.cancelHeartbeat(attempt.taskRunId);
179+
155180
const dependency = await this._prisma.taskRunDependency.findFirst({
156181
select: {
157182
id: true,
@@ -270,6 +295,7 @@ export class CreateCheckpointService extends BaseService {
270295
};
271296
}
272297

298+
//resume the dependent task
273299
await ResumeTaskDependencyService.enqueue(dependency.id, lastAttempt.id, this._prisma);
274300

275301
return {
@@ -289,6 +315,14 @@ export class CreateCheckpointService extends BaseService {
289315
});
290316

291317
if (checkpointEvent) {
318+
//heartbeats will start again when the run resumes
319+
logger.log("CreateCheckpointService: Canceling heartbeat", {
320+
attemptId: attempt.id,
321+
taskRunId: attempt.taskRunId,
322+
type: "WAIT_FOR_BATCH",
323+
});
324+
await marqs?.cancelHeartbeat(attempt.taskRunId);
325+
292326
const batchRun = await this._prisma.batchTaskRun.findFirst({
293327
select: {
294328
id: true,
@@ -348,23 +382,11 @@ export class CreateCheckpointService extends BaseService {
348382
};
349383
}
350384

351-
if (reason.type === "WAIT_FOR_DURATION") {
352-
await marqs?.replaceMessage(
353-
attempt.taskRunId,
354-
{
355-
type: "RESUME_AFTER_DURATION",
356-
resumableAttemptId: attempt.id,
357-
checkpointEventId: checkpointEvent.id,
358-
},
359-
reason.now + reason.ms
360-
);
361-
}
362-
363385
return {
364386
success: true,
365387
checkpoint,
366388
event: checkpointEvent,
367-
keepRunAlive,
389+
keepRunAlive: false,
368390
};
369391
}
370392
}

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ export class ResumeBatchRunService extends BaseService {
8080
completedAttemptIds: [],
8181
resumableAttemptId: batchRun.dependentTaskAttempt.id,
8282
checkpointEventId: batchRun.checkpointEventId,
83+
taskIdentifier: batchRun.dependentTaskAttempt.taskRun.taskIdentifier,
8384
projectId: batchRun.dependentTaskAttempt.runtimeEnvironment.projectId,
8485
environmentId: batchRun.dependentTaskAttempt.runtimeEnvironment.id,
8586
environmentType: batchRun.dependentTaskAttempt.runtimeEnvironment.type,
@@ -109,6 +110,10 @@ export class ResumeBatchRunService extends BaseService {
109110
completedAttemptIds: batchRun.items.map((item) => item.taskRunAttemptId).filter(Boolean),
110111
resumableAttemptId: batchRun.dependentTaskAttempt.id,
111112
checkpointEventId: batchRun.checkpointEventId ?? undefined,
113+
taskIdentifier: batchRun.dependentTaskAttempt.taskRun.taskIdentifier,
114+
projectId: batchRun.dependentTaskAttempt.runtimeEnvironment.projectId,
115+
environmentId: batchRun.dependentTaskAttempt.runtimeEnvironment.id,
116+
environmentType: batchRun.dependentTaskAttempt.runtimeEnvironment.type,
112117
});
113118
}
114119
}

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export class ResumeTaskDependencyService extends BaseService {
4848
completedAttemptIds: [sourceTaskAttemptId],
4949
resumableAttemptId: dependency.dependentAttempt.id,
5050
checkpointEventId: dependency.checkpointEventId,
51+
taskIdentifier: dependency.taskRun.taskIdentifier,
5152
projectId: dependency.taskRun.runtimeEnvironment.projectId,
5253
environmentId: dependency.taskRun.runtimeEnvironment.id,
5354
environmentType: dependency.taskRun.runtimeEnvironment.type,
@@ -77,6 +78,10 @@ export class ResumeTaskDependencyService extends BaseService {
7778
completedAttemptIds: [sourceTaskAttemptId],
7879
resumableAttemptId: dependency.dependentAttempt.id,
7980
checkpointEventId: dependency.checkpointEventId ?? undefined,
81+
taskIdentifier: dependency.taskRun.taskIdentifier,
82+
projectId: dependency.taskRun.runtimeEnvironment.projectId,
83+
environmentId: dependency.taskRun.runtimeEnvironment.id,
84+
environmentType: dependency.taskRun.runtimeEnvironment.type,
8085
});
8186
}
8287
}

references/v3-catalog/src/trigger/checkpoints.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,18 @@ export const nestedDependencies = task({
6161
const triggerOrBatch = depth % 2 === 0;
6262

6363
if (triggerOrBatch) {
64-
const result = await nestedDependencies.triggerAndWait({
65-
depth: depth + 1,
66-
maxDepth,
67-
waitSeconds,
68-
failAttemptChance,
69-
});
70-
logger.log(`Triggered complete`);
71-
72-
if (!result.ok && failParents) {
73-
throw new Error(`Failed at ${depth}/${maxDepth} depth`);
64+
for (let i = 0; i < batchSize; i++) {
65+
const result = await nestedDependencies.triggerAndWait({
66+
depth: depth + 1,
67+
maxDepth,
68+
waitSeconds,
69+
failAttemptChance,
70+
});
71+
logger.log(`Triggered complete ${i + 1}/${batchSize}`);
72+
73+
if (!result.ok && failParents) {
74+
throw new Error(`Failed at ${depth}/${maxDepth} depth`);
75+
}
7476
}
7577
} else {
7678
const results = await nestedDependencies.batchTriggerAndWait(

0 commit comments

Comments
 (0)