Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions pkgs/edge-worker/src/core/Worker.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
import type postgres from 'postgres';
import type { IBatchProcessor, ILifecycle, WorkerBootstrap } from './types.js';
import type { Logger } from '../platform/types.js';

export interface WorkerOptions {
requestShutdown?: () => void;
cleanup?: () => Promise<void>;
}

export class Worker {
private lifecycle: ILifecycle;
private logger: Logger;
private abortController = new AbortController();
private readonly requestShutdown?: () => void;
private readonly cleanup?: () => Promise<void>;

private batchProcessor: IBatchProcessor;
private sql: postgres.Sql;
private mainLoopPromise: Promise<void> | undefined;
private deprecationLogged = false;

constructor(
batchProcessor: IBatchProcessor,
lifecycle: ILifecycle,
sql: postgres.Sql,
logger: Logger,
requestShutdown?: () => void
options: WorkerOptions = {}
) {
this.sql = sql;
this.lifecycle = lifecycle;
this.batchProcessor = batchProcessor;
this.logger = logger;
this.requestShutdown = requestShutdown;
this.requestShutdown = options.requestShutdown;
this.cleanup = options.cleanup;
}

startOnlyOnce(workerBootstrap: WorkerBootstrap) {
Expand Down Expand Up @@ -96,8 +99,10 @@ export class Worker {

this.lifecycle.acknowledgeStop();

this.logger.debug('-> Closing SQL connection...');
await this.sql.end();
if (this.cleanup) {
this.logger.debug('-> Running worker cleanup...');
await this.cleanup();
}

// Signal graceful stop complete
this.logger.shutdown('stopped');
Expand Down
4 changes: 3 additions & 1 deletion pkgs/edge-worker/src/flow/StepTaskPoller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ export class StepTaskPoller<TFlow extends AnyFlow>
}

const workerId = this.getWorkerId();
const batchSize = limit ?? this.config.batchSize;
const batchSize = limit === undefined
? this.config.batchSize
: Math.min(this.config.batchSize, limit);
this.logger.debug(
`Two-phase polling for flow tasks with batch size ${batchSize}, maxPollSeconds: ${this.config.maxPollSeconds}, pollIntervalMs: ${this.config.pollIntervalMs}`
);
Expand Down
7 changes: 5 additions & 2 deletions pkgs/edge-worker/src/flow/createFlowWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export function createFlowWorker<
);
}

const ownsSql = !config.sql;
const sql =
config.sql ||
postgres(config.connectionString as string, {
Expand Down Expand Up @@ -207,8 +208,10 @@ export function createFlowWorker<
return new Worker(
batchProcessor,
lifecycle,
sql,
createLogger('Worker'),
() => platformAdapter.requestShutdown()
{
requestShutdown: platformAdapter.requestShutdown?.bind(platformAdapter),
cleanup: ownsSql ? () => sql.end() : undefined,
}
);
}
10 changes: 6 additions & 4 deletions pkgs/edge-worker/src/platform/SupabasePlatformAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ export class SupabasePlatformAdapter implements PlatformAdapter<SupabaseResource
async stopWorker(): Promise<void> {
this.requestShutdown();

if (this.worker) {
await this.worker.stop();
try {
if (this.worker) {
await this.worker.stop();
}
} finally {
await this._platformResources.sql.end();
}

await this._platformResources.sql.end();
}

requestShutdown(): void {
Expand Down
2 changes: 1 addition & 1 deletion pkgs/edge-worker/src/platform/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export interface PlatformAdapter<TResources extends Record<string, unknown> = Re
/**
* Trigger the shared shutdown signal used by pollers, executors, and contexts.
*/
requestShutdown(): void;
requestShutdown?(): void;

/**
* Get the connection string for the database
Expand Down
4 changes: 3 additions & 1 deletion pkgs/edge-worker/src/queue/ReadWithPollPoller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ export class ReadWithPollPoller<TPayload extends Json> {
return [];
}

const batchSize = limit ?? this.config.batchSize;
const batchSize = limit === undefined
? this.config.batchSize
: Math.min(this.config.batchSize, limit);

this.logger.debug(`Polling queue '${this.queue.queueName}' with batch size ${batchSize}`);
const messages = await this.queue.readWithPoll(
Expand Down
7 changes: 5 additions & 2 deletions pkgs/edge-worker/src/queue/createQueueWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ export function createQueueWorker<TPayload extends Json, TResources extends Reco
const abortSignal = platformAdapter.shutdownSignal;

// Use provided SQL connection if available, otherwise create one from connection string
const ownsSql = !config.sql;
const sql =
config.sql ||
postgres(config.connectionString || '', {
Expand Down Expand Up @@ -221,8 +222,10 @@ export function createQueueWorker<TPayload extends Json, TResources extends Reco
return new Worker(
batchProcessor,
lifecycle,
sql,
createLogger('Worker'),
() => platformAdapter.requestShutdown()
{
requestShutdown: platformAdapter.requestShutdown?.bind(platformAdapter),
cleanup: ownsSql ? () => sql.end() : undefined,
}
);
}
23 changes: 23 additions & 0 deletions pkgs/edge-worker/tests/types/platform-adapter.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { PlatformAdapter } from '../../src/platform/types.ts';

const adapterWithoutRequestShutdown: PlatformAdapter = {
async startWorker() {},
async stopWorker() {},
get connectionString() {
return undefined;
},
get env() {
return {};
},
get shutdownSignal() {
return new AbortController().signal;
},
get platformResources() {
return {};
},
get isLocalEnvironment() {
return false;
},
};

void adapterWithoutRequestShutdown;
184 changes: 184 additions & 0 deletions pkgs/edge-worker/tests/unit/Poller.batchSize.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import { assertEquals } from '@std/assert';
import { ReadWithPollPoller } from '../../src/queue/ReadWithPollPoller.ts';
import { StepTaskPoller } from '../../src/flow/StepTaskPoller.ts';
import { fakeLogger } from '../fakes.ts';

Deno.test('ReadWithPollPoller caps limit at configured batchSize', async () => {
let readQty: number | undefined;
const queue = {
queueName: 'test_queue',
readWithPoll: (qty: number) => {
readQty = qty;
return Promise.resolve([]);
},
};

const poller = new ReadWithPollPoller(
queue as never,
new AbortController().signal,
{
batchSize: 5,
maxPollSeconds: 1,
pollIntervalMs: 100,
visibilityTimeout: 10,
},
fakeLogger
);

await poller.poll(20);

assertEquals(readQty, 5);
});

Deno.test('ReadWithPollPoller uses smaller available slot limit', async () => {
let readQty: number | undefined;
const queue = {
queueName: 'test_queue',
readWithPoll: (qty: number) => {
readQty = qty;
return Promise.resolve([]);
},
};

const poller = new ReadWithPollPoller(
queue as never,
new AbortController().signal,
{
batchSize: 5,
maxPollSeconds: 1,
pollIntervalMs: 100,
visibilityTimeout: 10,
},
fakeLogger
);

await poller.poll(2);

assertEquals(readQty, 2);
});

Deno.test('ReadWithPollPoller uses configured batchSize without limit', async () => {
let readQty: number | undefined;
const queue = {
queueName: 'test_queue',
readWithPoll: (qty: number) => {
readQty = qty;
return Promise.resolve([]);
},
};

const poller = new ReadWithPollPoller(
queue as never,
new AbortController().signal,
{
batchSize: 5,
maxPollSeconds: 1,
pollIntervalMs: 100,
visibilityTimeout: 10,
},
fakeLogger
);

await poller.poll();

assertEquals(readQty, 5);
});

Deno.test('StepTaskPoller caps limit at configured batchSize', async () => {
let readQty: number | undefined;
const adapter = {
readMessages: (
_queueName: string,
_visibilityTimeout: number,
qty: number
) => {
readQty = qty;
return Promise.resolve([]);
},
startTasks: () => Promise.resolve([]),
};

const poller = new StepTaskPoller(
adapter as never,
new AbortController().signal,
{
batchSize: 5,
queueName: 'test_flow',
visibilityTimeout: 10,
maxPollSeconds: 1,
pollIntervalMs: 100,
},
() => 'worker-id',
fakeLogger
);

await poller.poll(20);

assertEquals(readQty, 5);
});

Deno.test('StepTaskPoller uses smaller available slot limit', async () => {
let readQty: number | undefined;
const adapter = {
readMessages: (
_queueName: string,
_visibilityTimeout: number,
qty: number
) => {
readQty = qty;
return Promise.resolve([]);
},
startTasks: () => Promise.resolve([]),
};

const poller = new StepTaskPoller(
adapter as never,
new AbortController().signal,
{
batchSize: 5,
queueName: 'test_flow',
visibilityTimeout: 10,
maxPollSeconds: 1,
pollIntervalMs: 100,
},
() => 'worker-id',
fakeLogger
);

await poller.poll(2);

assertEquals(readQty, 2);
});

Deno.test('StepTaskPoller uses configured batchSize without limit', async () => {
let readQty: number | undefined;
const adapter = {
readMessages: (
_queueName: string,
_visibilityTimeout: number,
qty: number
) => {
readQty = qty;
return Promise.resolve([]);
},
startTasks: () => Promise.resolve([]),
};

const poller = new StepTaskPoller(
adapter as never,
new AbortController().signal,
{
batchSize: 5,
queueName: 'test_flow',
visibilityTimeout: 10,
maxPollSeconds: 1,
pollIntervalMs: 100,
},
() => 'worker-id',
fakeLogger
);

await poller.poll();

assertEquals(readQty, 5);
});
Loading
Loading