Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 60 additions & 13 deletions workers/paymaster/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,33 @@ const DAYS_LEFT_ALERT = [3, 2, 1];
// eslint-disable-next-line @typescript-eslint/no-magic-numbers
const DAYS_AFTER_BLOCK_TO_REMIND = [1, 2, 3, 5, 7, 30];

/**
* Bounds concurrent updateOne / addTask calls per subscription check tick.
*/
const WORKSPACE_PROCESSING_CONCURRENCY = 25;

const WORKSPACE_CURSOR_BATCH_SIZE = 200;

/**
* Keep in sync with fields read by `processWorkspaceSubscriptionCheck` and its helpers.
*/
const WORKSPACE_SUBSCRIPTION_PROJECTION = {
_id: 1,
name: 1,
tariffPlanId: 1,
lastChargeDate: 1,
paidUntil: 1,
isDebug: 1,
isBlocked: 1,
blockedDate: 1,
subscriptionId: 1,
} as const;

const PLAN_PROJECTION = {
_id: 1,
monthlyCharge: 1,
} as const;

/**
* Worker to check workspaces subscription status and ban workspaces without actual subscription
*/
Expand Down Expand Up @@ -151,7 +178,10 @@ export default class PaymasterWorker extends Worker {
throw new Error('Plans collection is not initialized');
}

this.plans = await this.plansCollection.find({}).toArray();
this.plans = await this.plansCollection
.find({})
.project<PlanDBScheme>(PLAN_PROJECTION)
.toArray();

if (this.plans.length === 0) {
throw new Error('Please add tariff plans to the database');
Expand Down Expand Up @@ -195,28 +225,45 @@ export default class PaymasterWorker extends Worker {
* Called periodically, enumerate through workspaces and check if today is a payday for workspace subscription
*/
private async handleWorkspaceSubscriptionCheckEvent(): Promise<void> {
const workspaces = await this.workspaces.find({}).toArray();
const cursor = this.workspaces
.find({})
.project<WorkspaceDBScheme>(WORKSPACE_SUBSCRIPTION_PROJECTION)
.batchSize(WORKSPACE_CURSOR_BATCH_SIZE);

let batch: WorkspaceDBScheme[] = [];

await Promise.all(workspaces
.filter(workspace => {
const flush = async (currentBatch: WorkspaceDBScheme[]): Promise<void> => {
if (currentBatch.length === 0) {
return;
}

await Promise.all(currentBatch.map((workspace) => this.processWorkspaceSubscriptionCheck(workspace)));
};

try {
for await (const workspace of cursor) {
/**
* Skip workspace without lastChargeDate
*/
if (!workspace.lastChargeDate) {
const error = new Error('[Paymaster] Workspace without lastChargeDate detected');

HawkCatcher.send(error, {
HawkCatcher.send(new Error('[Paymaster] Workspace without lastChargeDate detected'), {
workspaceId: workspace._id.toString(),
});
continue;
}

batch.push(workspace);

return false;
if (batch.length >= WORKSPACE_PROCESSING_CONCURRENCY) {
await flush(batch);
batch = [];
}
}

return true;
})
.map(
(workspace) => this.processWorkspaceSubscriptionCheck(workspace)
));
await flush(batch);
} finally {
await cursor.close();
}
}

/**
Expand Down
48 changes: 48 additions & 0 deletions workers/paymaster/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,54 @@ describe('PaymasterWorker', () => {
MockDate.reset();
});

test('Should process every workspace when there are several batches', async () => {
/**
* 50 > WORKSPACE_PROCESSING_CONCURRENCY (25), so the subscription check
* has to flush more than one batch.
*/
const WORKSPACES_COUNT = 50;
const currentDate = new Date('2005-12-22');
const plan = createPlanMock({
monthlyCharge: 0,
isDefault: true,
});

const workspaces = Array.from({ length: WORKSPACES_COUNT }, () =>
createWorkspaceMock({
plan,
subscriptionId: null,
lastChargeDate: new Date('2005-11-22'),
isBlocked: false,
billingPeriodEventsCount: 0,
})
);

await tariffCollection.insertOne(plan);
await workspacesCollection.insertMany(workspaces);

MockDate.set(currentDate);

const worker = new PaymasterWorker();
const processSpy = jest
.spyOn(worker as any, 'processWorkspaceSubscriptionCheck')
.mockResolvedValue([null, false]);

await worker.start();
await worker.handle(WORKSPACE_SUBSCRIPTION_CHECK);
await worker.finish();

expect(processSpy).toHaveBeenCalledTimes(WORKSPACES_COUNT);

const calledIds = processSpy.mock.calls
.map((call) => (call[0] as WorkspaceDBScheme)._id.toString())
.sort();
const expectedIds = workspaces.map((w) => w._id.toString()).sort();

expect(calledIds).toEqual(expectedIds);

MockDate.reset();
});

afterAll(async () => {
await connection.close();
MockDate.reset();
Expand Down
Loading