Skip to content

Commit 8b920dc

Browse files
committed
fix(hitl): cancel paused executions directly in DB
Paused HITL executions are idle in the DB — they don't poll Redis or run in-process, so the existing cancel signals had no effect. The DB status stayed 'pending', causing the optimistic 'cancelling' update to revert on refetch. - Add PauseResumeManager.cancelPausedExecution: atomically sets paused_executions.status and workflow_execution_logs.status to 'cancelled' inside a FOR UPDATE transaction - Guard enqueueOrStartResume against resuming a cancelled execution - Include pausedCancelled in the cancel route success check
1 parent 316b6ac commit 8b920dc

2 files changed

Lines changed: 50 additions & 2 deletions

File tree

apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { checkHybridAuth } from '@/lib/auth/hybrid'
44
import { markExecutionCancelled } from '@/lib/execution/cancellation'
55
import { abortManualExecution } from '@/lib/execution/manual-cancellation'
66
import { captureServerEvent } from '@/lib/posthog/server'
7+
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
78
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
89

910
const logger = createLogger('CancelExecutionAPI')
@@ -49,19 +50,24 @@ export async function POST(
4950

5051
const cancellation = await markExecutionCancelled(executionId)
5152
const locallyAborted = abortManualExecution(executionId)
53+
const pausedCancelled = await PauseResumeManager.cancelPausedExecution(executionId)
5254

5355
if (cancellation.durablyRecorded) {
5456
logger.info('Execution marked as cancelled in Redis', { executionId })
5557
} else if (locallyAborted) {
5658
logger.info('Execution cancelled via local in-process fallback', { executionId })
59+
} else if (pausedCancelled) {
60+
logger.info('Paused execution cancelled directly in database', { executionId })
5761
} else {
5862
logger.warn('Execution cancellation was not durably recorded', {
5963
executionId,
6064
reason: cancellation.reason,
6165
})
6266
}
6367

64-
if (cancellation.durablyRecorded || locallyAborted) {
68+
const success = cancellation.durablyRecorded || locallyAborted || pausedCancelled
69+
70+
if (success) {
6571
const workspaceId = workflowAuthorization.workflow?.workspaceId
6672
captureServerEvent(
6773
auth.userId,
@@ -72,11 +78,12 @@ export async function POST(
7278
}
7379

7480
return NextResponse.json({
75-
success: cancellation.durablyRecorded || locallyAborted,
81+
success,
7682
executionId,
7783
redisAvailable: cancellation.reason !== 'redis_unavailable',
7884
durablyRecorded: cancellation.durablyRecorded,
7985
locallyAborted,
86+
pausedCancelled,
8087
reason: cancellation.reason,
8188
})
8289
} catch (error: any) {

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,10 @@ export class PauseResumeManager {
193193
throw new Error('Paused execution not found or already resumed')
194194
}
195195

196+
if (pausedExecution.status === 'cancelled') {
197+
throw new Error('Execution has been cancelled')
198+
}
199+
196200
const pausePoints = pausedExecution.pausePoints as Record<string, any>
197201
const pausePoint = pausePoints?.[contextId]
198202
if (!pausePoint) {
@@ -1253,6 +1257,43 @@ export class PauseResumeManager {
12531257
})
12541258
}
12551259

1260+
/**
1261+
* Cancels a paused execution by updating both the paused execution record and the
1262+
* workflow execution log status to 'cancelled'. Returns true if a paused execution
1263+
* was found and cancelled, false if no paused execution exists for this executionId.
1264+
*/
1265+
static async cancelPausedExecution(executionId: string): Promise<boolean> {
1266+
const now = new Date()
1267+
1268+
return await db.transaction(async (tx) => {
1269+
const pausedExecution = await tx
1270+
.select({ id: pausedExecutions.id })
1271+
.from(pausedExecutions)
1272+
.where(
1273+
and(eq(pausedExecutions.executionId, executionId), eq(pausedExecutions.status, 'paused'))
1274+
)
1275+
.for('update')
1276+
.limit(1)
1277+
.then((rows) => rows[0])
1278+
1279+
if (!pausedExecution) {
1280+
return false
1281+
}
1282+
1283+
await tx
1284+
.update(pausedExecutions)
1285+
.set({ status: 'cancelled', updatedAt: now })
1286+
.where(eq(pausedExecutions.id, pausedExecution.id))
1287+
1288+
await tx
1289+
.update(workflowExecutionLogs)
1290+
.set({ status: 'cancelled' })
1291+
.where(eq(workflowExecutionLogs.executionId, executionId))
1292+
1293+
return true
1294+
})
1295+
}
1296+
12561297
static async listPausedExecutions(options: {
12571298
workflowId: string
12581299
status?: string | string[]

0 commit comments

Comments
 (0)