diff --git a/workers/paymaster/src/index.ts b/workers/paymaster/src/index.ts index 94be50d9..7d588be5 100644 --- a/workers/paymaster/src/index.ts +++ b/workers/paymaster/src/index.ts @@ -34,6 +34,33 @@ const DAYS_LEFT_ALERT = [3, 2, 1]; // eslint-disable-next-line @typescript-eslint/no-magic-numbers const DAYS_AFTER_BLOCK_TO_REMIND = [1, 2, 3, 5, 7, 30]; +/** + * Bounds concurrent updateOne / addTask calls per subscription check tick. + */ +const WORKSPACE_PROCESSING_CONCURRENCY = 25; + +const WORKSPACE_CURSOR_BATCH_SIZE = 200; + +/** + * Keep in sync with fields read by `processWorkspaceSubscriptionCheck` and its helpers. + */ +const WORKSPACE_SUBSCRIPTION_PROJECTION = { + _id: 1, + name: 1, + tariffPlanId: 1, + lastChargeDate: 1, + paidUntil: 1, + isDebug: 1, + isBlocked: 1, + blockedDate: 1, + subscriptionId: 1, +} as const; + +const PLAN_PROJECTION = { + _id: 1, + monthlyCharge: 1, +} as const; + /** * Worker to check workspaces subscription status and ban workspaces without actual subscription */ @@ -151,7 +178,10 @@ export default class PaymasterWorker extends Worker { throw new Error('Plans collection is not initialized'); } - this.plans = await this.plansCollection.find({}).toArray(); + this.plans = await this.plansCollection + .find({}) + .project(PLAN_PROJECTION) + .toArray(); if (this.plans.length === 0) { throw new Error('Please add tariff plans to the database'); @@ -195,28 +225,45 @@ export default class PaymasterWorker extends Worker { * Called periodically, enumerate through workspaces and check if today is a payday for workspace subscription */ private async handleWorkspaceSubscriptionCheckEvent(): Promise { - const workspaces = await this.workspaces.find({}).toArray(); + const cursor = this.workspaces + .find({}) + .project(WORKSPACE_SUBSCRIPTION_PROJECTION) + .batchSize(WORKSPACE_CURSOR_BATCH_SIZE); + + let batch: WorkspaceDBScheme[] = []; - await Promise.all(workspaces - .filter(workspace => { + const flush = async (currentBatch: WorkspaceDBScheme[]): Promise => { + if (currentBatch.length === 0) { + return; + } + + await Promise.all(currentBatch.map((workspace) => this.processWorkspaceSubscriptionCheck(workspace))); + }; + + try { + for await (const workspace of cursor) { /** * Skip workspace without lastChargeDate */ if (!workspace.lastChargeDate) { - const error = new Error('[Paymaster] Workspace without lastChargeDate detected'); - - HawkCatcher.send(error, { + HawkCatcher.send(new Error('[Paymaster] Workspace without lastChargeDate detected'), { workspaceId: workspace._id.toString(), }); + continue; + } + + batch.push(workspace); - return false; + if (batch.length >= WORKSPACE_PROCESSING_CONCURRENCY) { + await flush(batch); + batch = []; } + } - return true; - }) - .map( - (workspace) => this.processWorkspaceSubscriptionCheck(workspace) - )); + await flush(batch); + } finally { + await cursor.close(); + } } /** diff --git a/workers/paymaster/tests/index.test.ts b/workers/paymaster/tests/index.test.ts index 51ff31fd..23a1bd69 100644 --- a/workers/paymaster/tests/index.test.ts +++ b/workers/paymaster/tests/index.test.ts @@ -794,6 +794,54 @@ describe('PaymasterWorker', () => { MockDate.reset(); }); + test('Should process every workspace when there are several batches', async () => { + /** + * 50 > WORKSPACE_PROCESSING_CONCURRENCY (25), so the subscription check + * has to flush more than one batch. + */ + const WORKSPACES_COUNT = 50; + const currentDate = new Date('2005-12-22'); + const plan = createPlanMock({ + monthlyCharge: 0, + isDefault: true, + }); + + const workspaces = Array.from({ length: WORKSPACES_COUNT }, () => + createWorkspaceMock({ + plan, + subscriptionId: null, + lastChargeDate: new Date('2005-11-22'), + isBlocked: false, + billingPeriodEventsCount: 0, + }) + ); + + await tariffCollection.insertOne(plan); + await workspacesCollection.insertMany(workspaces); + + MockDate.set(currentDate); + + const worker = new PaymasterWorker(); + const processSpy = jest + .spyOn(worker as any, 'processWorkspaceSubscriptionCheck') + .mockResolvedValue([null, false]); + + await worker.start(); + await worker.handle(WORKSPACE_SUBSCRIPTION_CHECK); + await worker.finish(); + + expect(processSpy).toHaveBeenCalledTimes(WORKSPACES_COUNT); + + const calledIds = processSpy.mock.calls + .map((call) => (call[0] as WorkspaceDBScheme)._id.toString()) + .sort(); + const expectedIds = workspaces.map((w) => w._id.toString()).sort(); + + expect(calledIds).toEqual(expectedIds); + + MockDate.reset(); + }); + afterAll(async () => { await connection.close(); MockDate.reset();