diff --git a/pkgs/edge-worker/src/core/Worker.ts b/pkgs/edge-worker/src/core/Worker.ts index 35888ccb3..40fb05626 100644 --- a/pkgs/edge-worker/src/core/Worker.ts +++ b/pkgs/edge-worker/src/core/Worker.ts @@ -1,30 +1,33 @@ -import type postgres from 'postgres'; import type { IBatchProcessor, ILifecycle, WorkerBootstrap } from './types.js'; import type { Logger } from '../platform/types.js'; +export interface WorkerOptions { + requestShutdown?: () => void; + cleanup?: () => Promise; +} + export class Worker { private lifecycle: ILifecycle; private logger: Logger; private abortController = new AbortController(); private readonly requestShutdown?: () => void; + private readonly cleanup?: () => Promise; private batchProcessor: IBatchProcessor; - private sql: postgres.Sql; private mainLoopPromise: Promise | undefined; private deprecationLogged = false; constructor( batchProcessor: IBatchProcessor, lifecycle: ILifecycle, - sql: postgres.Sql, logger: Logger, - requestShutdown?: () => void + options: WorkerOptions = {} ) { - this.sql = sql; this.lifecycle = lifecycle; this.batchProcessor = batchProcessor; this.logger = logger; - this.requestShutdown = requestShutdown; + this.requestShutdown = options.requestShutdown; + this.cleanup = options.cleanup; } startOnlyOnce(workerBootstrap: WorkerBootstrap) { @@ -96,8 +99,10 @@ export class Worker { this.lifecycle.acknowledgeStop(); - this.logger.debug('-> Closing SQL connection...'); - await this.sql.end(); + if (this.cleanup) { + this.logger.debug('-> Running worker cleanup...'); + await this.cleanup(); + } // Signal graceful stop complete this.logger.shutdown('stopped'); diff --git a/pkgs/edge-worker/src/flow/StepTaskPoller.ts b/pkgs/edge-worker/src/flow/StepTaskPoller.ts index dedf13846..8cb0ddb76 100644 --- a/pkgs/edge-worker/src/flow/StepTaskPoller.ts +++ b/pkgs/edge-worker/src/flow/StepTaskPoller.ts @@ -43,7 +43,9 @@ export class StepTaskPoller } const workerId = this.getWorkerId(); - const batchSize = limit ?? this.config.batchSize; + const batchSize = limit === undefined + ? this.config.batchSize + : Math.min(this.config.batchSize, limit); this.logger.debug( `Two-phase polling for flow tasks with batch size ${batchSize}, maxPollSeconds: ${this.config.maxPollSeconds}, pollIntervalMs: ${this.config.pollIntervalMs}` ); diff --git a/pkgs/edge-worker/src/flow/createFlowWorker.ts b/pkgs/edge-worker/src/flow/createFlowWorker.ts index b024fd1b6..2c6459aef 100644 --- a/pkgs/edge-worker/src/flow/createFlowWorker.ts +++ b/pkgs/edge-worker/src/flow/createFlowWorker.ts @@ -81,6 +81,7 @@ export function createFlowWorker< ); } + const ownsSql = !config.sql; const sql = config.sql || postgres(config.connectionString as string, { @@ -207,8 +208,10 @@ export function createFlowWorker< return new Worker( batchProcessor, lifecycle, - sql, createLogger('Worker'), - () => platformAdapter.requestShutdown() + { + requestShutdown: platformAdapter.requestShutdown?.bind(platformAdapter), + cleanup: ownsSql ? () => sql.end() : undefined, + } ); } diff --git a/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts b/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts index 92d999fd1..74c76b3d3 100644 --- a/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts +++ b/pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts @@ -106,11 +106,13 @@ export class SupabasePlatformAdapter implements PlatformAdapter { this.requestShutdown(); - if (this.worker) { - await this.worker.stop(); + try { + if (this.worker) { + await this.worker.stop(); + } + } finally { + await this._platformResources.sql.end(); } - - await this._platformResources.sql.end(); } requestShutdown(): void { diff --git a/pkgs/edge-worker/src/platform/types.ts b/pkgs/edge-worker/src/platform/types.ts index 972a4eff0..292cbb5f1 100644 --- a/pkgs/edge-worker/src/platform/types.ts +++ b/pkgs/edge-worker/src/platform/types.ts @@ -80,7 +80,7 @@ export interface PlatformAdapter = Re /** * Trigger the shared shutdown signal used by pollers, executors, and contexts. */ - requestShutdown(): void; + requestShutdown?(): void; /** * Get the connection string for the database diff --git a/pkgs/edge-worker/src/queue/ReadWithPollPoller.ts b/pkgs/edge-worker/src/queue/ReadWithPollPoller.ts index 121ad417f..23e1dea81 100644 --- a/pkgs/edge-worker/src/queue/ReadWithPollPoller.ts +++ b/pkgs/edge-worker/src/queue/ReadWithPollPoller.ts @@ -28,7 +28,9 @@ export class ReadWithPollPoller { return []; } - const batchSize = limit ?? this.config.batchSize; + const batchSize = limit === undefined + ? this.config.batchSize + : Math.min(this.config.batchSize, limit); this.logger.debug(`Polling queue '${this.queue.queueName}' with batch size ${batchSize}`); const messages = await this.queue.readWithPoll( diff --git a/pkgs/edge-worker/src/queue/createQueueWorker.ts b/pkgs/edge-worker/src/queue/createQueueWorker.ts index 78d659008..2527fdc85 100644 --- a/pkgs/edge-worker/src/queue/createQueueWorker.ts +++ b/pkgs/edge-worker/src/queue/createQueueWorker.ts @@ -135,6 +135,7 @@ export function createQueueWorker platformAdapter.requestShutdown() + { + requestShutdown: platformAdapter.requestShutdown?.bind(platformAdapter), + cleanup: ownsSql ? () => sql.end() : undefined, + } ); } diff --git a/pkgs/edge-worker/tests/types/platform-adapter.test-d.ts b/pkgs/edge-worker/tests/types/platform-adapter.test-d.ts new file mode 100644 index 000000000..42f4d2759 --- /dev/null +++ b/pkgs/edge-worker/tests/types/platform-adapter.test-d.ts @@ -0,0 +1,23 @@ +import type { PlatformAdapter } from '../../src/platform/types.ts'; + +const adapterWithoutRequestShutdown: PlatformAdapter = { + async startWorker() {}, + async stopWorker() {}, + get connectionString() { + return undefined; + }, + get env() { + return {}; + }, + get shutdownSignal() { + return new AbortController().signal; + }, + get platformResources() { + return {}; + }, + get isLocalEnvironment() { + return false; + }, +}; + +void adapterWithoutRequestShutdown; diff --git a/pkgs/edge-worker/tests/unit/Poller.batchSize.test.ts b/pkgs/edge-worker/tests/unit/Poller.batchSize.test.ts new file mode 100644 index 000000000..c1aeec6ef --- /dev/null +++ b/pkgs/edge-worker/tests/unit/Poller.batchSize.test.ts @@ -0,0 +1,184 @@ +import { assertEquals } from '@std/assert'; +import { ReadWithPollPoller } from '../../src/queue/ReadWithPollPoller.ts'; +import { StepTaskPoller } from '../../src/flow/StepTaskPoller.ts'; +import { fakeLogger } from '../fakes.ts'; + +Deno.test('ReadWithPollPoller caps limit at configured batchSize', async () => { + let readQty: number | undefined; + const queue = { + queueName: 'test_queue', + readWithPoll: (qty: number) => { + readQty = qty; + return Promise.resolve([]); + }, + }; + + const poller = new ReadWithPollPoller( + queue as never, + new AbortController().signal, + { + batchSize: 5, + maxPollSeconds: 1, + pollIntervalMs: 100, + visibilityTimeout: 10, + }, + fakeLogger + ); + + await poller.poll(20); + + assertEquals(readQty, 5); +}); + +Deno.test('ReadWithPollPoller uses smaller available slot limit', async () => { + let readQty: number | undefined; + const queue = { + queueName: 'test_queue', + readWithPoll: (qty: number) => { + readQty = qty; + return Promise.resolve([]); + }, + }; + + const poller = new ReadWithPollPoller( + queue as never, + new AbortController().signal, + { + batchSize: 5, + maxPollSeconds: 1, + pollIntervalMs: 100, + visibilityTimeout: 10, + }, + fakeLogger + ); + + await poller.poll(2); + + assertEquals(readQty, 2); +}); + +Deno.test('ReadWithPollPoller uses configured batchSize without limit', async () => { + let readQty: number | undefined; + const queue = { + queueName: 'test_queue', + readWithPoll: (qty: number) => { + readQty = qty; + return Promise.resolve([]); + }, + }; + + const poller = new ReadWithPollPoller( + queue as never, + new AbortController().signal, + { + batchSize: 5, + maxPollSeconds: 1, + pollIntervalMs: 100, + visibilityTimeout: 10, + }, + fakeLogger + ); + + await poller.poll(); + + assertEquals(readQty, 5); +}); + +Deno.test('StepTaskPoller caps limit at configured batchSize', async () => { + let readQty: number | undefined; + const adapter = { + readMessages: ( + _queueName: string, + _visibilityTimeout: number, + qty: number + ) => { + readQty = qty; + return Promise.resolve([]); + }, + startTasks: () => Promise.resolve([]), + }; + + const poller = new StepTaskPoller( + adapter as never, + new AbortController().signal, + { + batchSize: 5, + queueName: 'test_flow', + visibilityTimeout: 10, + maxPollSeconds: 1, + pollIntervalMs: 100, + }, + () => 'worker-id', + fakeLogger + ); + + await poller.poll(20); + + assertEquals(readQty, 5); +}); + +Deno.test('StepTaskPoller uses smaller available slot limit', async () => { + let readQty: number | undefined; + const adapter = { + readMessages: ( + _queueName: string, + _visibilityTimeout: number, + qty: number + ) => { + readQty = qty; + return Promise.resolve([]); + }, + startTasks: () => Promise.resolve([]), + }; + + const poller = new StepTaskPoller( + adapter as never, + new AbortController().signal, + { + batchSize: 5, + queueName: 'test_flow', + visibilityTimeout: 10, + maxPollSeconds: 1, + pollIntervalMs: 100, + }, + () => 'worker-id', + fakeLogger + ); + + await poller.poll(2); + + assertEquals(readQty, 2); +}); + +Deno.test('StepTaskPoller uses configured batchSize without limit', async () => { + let readQty: number | undefined; + const adapter = { + readMessages: ( + _queueName: string, + _visibilityTimeout: number, + qty: number + ) => { + readQty = qty; + return Promise.resolve([]); + }, + startTasks: () => Promise.resolve([]), + }; + + const poller = new StepTaskPoller( + adapter as never, + new AbortController().signal, + { + batchSize: 5, + queueName: 'test_flow', + visibilityTimeout: 10, + maxPollSeconds: 1, + pollIntervalMs: 100, + }, + () => 'worker-id', + fakeLogger + ); + + await poller.poll(); + + assertEquals(readQty, 5); +}); diff --git a/pkgs/edge-worker/tests/unit/Worker.startOnlyOnce.test.ts b/pkgs/edge-worker/tests/unit/Worker.startOnlyOnce.test.ts index 9140a515e..5ea4c46e4 100644 --- a/pkgs/edge-worker/tests/unit/Worker.startOnlyOnce.test.ts +++ b/pkgs/edge-worker/tests/unit/Worker.startOnlyOnce.test.ts @@ -41,11 +41,6 @@ function createMockBatchProcessor(): IBatchProcessor { }; } -// Mock SQL connection -const mockSql = { - end: async () => {}, -} as never; - const workerBootstrap: WorkerBootstrap = { edgeFunctionName: 'test-function', workerId: 'test-worker-id', @@ -54,7 +49,7 @@ const workerBootstrap: WorkerBootstrap = { Deno.test('Worker.startOnlyOnce - starts worker when in Created state', async () => { const lifecycle = createMockLifecycle('created'); const batchProcessor = createMockBatchProcessor(); - const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger); + const worker = new Worker(batchProcessor, lifecycle, logger); worker.startOnlyOnce(workerBootstrap); @@ -71,7 +66,7 @@ Deno.test('Worker.startOnlyOnce - starts worker when in Created state', async () Deno.test('Worker.startOnlyOnce - ignores request when in Starting state', async () => { const lifecycle = createMockLifecycle('starting'); const batchProcessor = createMockBatchProcessor(); - const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger); + const worker = new Worker(batchProcessor, lifecycle, logger); worker.startOnlyOnce(workerBootstrap); @@ -88,7 +83,7 @@ Deno.test('Worker.startOnlyOnce - ignores request when in Starting state', async Deno.test('Worker.startOnlyOnce - ignores request when in Running state', async () => { const lifecycle = createMockLifecycle('running'); const batchProcessor = createMockBatchProcessor(); - const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger); + const worker = new Worker(batchProcessor, lifecycle, logger); worker.startOnlyOnce(workerBootstrap); @@ -105,7 +100,7 @@ Deno.test('Worker.startOnlyOnce - ignores request when in Running state', async Deno.test('Worker.startOnlyOnce - ignores request when in Stopping state', async () => { const lifecycle = createMockLifecycle('stopping'); const batchProcessor = createMockBatchProcessor(); - const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger); + const worker = new Worker(batchProcessor, lifecycle, logger); worker.startOnlyOnce(workerBootstrap); @@ -122,7 +117,7 @@ Deno.test('Worker.startOnlyOnce - ignores request when in Stopping state', async Deno.test('Worker.startOnlyOnce - ignores request when in Stopped state', async () => { const lifecycle = createMockLifecycle('stopped'); const batchProcessor = createMockBatchProcessor(); - const worker = new Worker(batchProcessor, lifecycle, mockSql as never, logger); + const worker = new Worker(batchProcessor, lifecycle, logger); worker.startOnlyOnce(workerBootstrap); diff --git a/pkgs/edge-worker/tests/unit/Worker.stop.test.ts b/pkgs/edge-worker/tests/unit/Worker.stop.test.ts new file mode 100644 index 000000000..247bef7ae --- /dev/null +++ b/pkgs/edge-worker/tests/unit/Worker.stop.test.ts @@ -0,0 +1,64 @@ +import { assertEquals } from '@std/assert'; +import { Worker } from '../../src/core/Worker.ts'; +import type { IBatchProcessor, ILifecycle } from '../../src/core/types.ts'; +import { fakeLogger } from '../fakes.ts'; + +function createRunningLifecycle(): ILifecycle { + let stopping = false; + let stopped = false; + + return { + acknowledgeStart: () => Promise.resolve(), + acknowledgeStop: () => { + stopped = true; + }, + sendHeartbeat: () => Promise.resolve(), + get edgeFunctionName() { + return 'test-function'; + }, + get queueName() { + return 'test-queue'; + }, + get isCreated() { + return false; + }, + get isRunning() { + return !stopping && !stopped; + }, + get isStopping() { + return stopping; + }, + get isStopped() { + return stopped; + }, + transitionToStopping: () => { + stopping = true; + }, + }; +} + +function createBatchProcessor(): IBatchProcessor { + return { + processBatch: () => Promise.resolve(), + awaitCompletion: () => Promise.resolve(), + }; +} + +Deno.test('Worker.stop calls provided cleanup callback', async () => { + let cleanupCalled = false; + const worker = new Worker( + createBatchProcessor(), + createRunningLifecycle(), + fakeLogger, + { + cleanup: () => { + cleanupCalled = true; + return Promise.resolve(); + }, + } + ); + + await worker.stop(); + + assertEquals(cleanupCalled, true); +}); diff --git a/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts b/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts index a7bad2bdf..0c0506cab 100644 --- a/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts +++ b/pkgs/edge-worker/tests/unit/platform/SupabasePlatformAdapter.test.ts @@ -369,3 +369,35 @@ Deno.test({ assertEquals(callOrder, ['abort', 'worker.stop', 'sql.end']); }, }); + +Deno.test({ + name: 'stopWorker closes sql when worker stop rejects', + sanitizeResources: false, + fn: async () => { + const callOrder: string[] = []; + const deps = createMockDeps(); + const adapter = new SupabasePlatformAdapter(undefined, deps); + + (adapter as unknown as { worker: Worker | null }).worker = { + startOnlyOnce: () => {}, + stop: () => { + callOrder.push('worker.stop'); + return Promise.reject(new Error('stop failed')); + }, + } as unknown as Worker; + + const sql = adapter.sql as unknown as { end: () => Promise }; + sql.end = () => { + callOrder.push('sql.end'); + return Promise.resolve(); + }; + + try { + await adapter.stopWorker(); + } catch { + // expected: the original worker.stop error should still reject + } + + assertEquals(callOrder, ['worker.stop', 'sql.end']); + }, +});