Skip to content

Commit 19023d9

Browse files
improve enqueue performance
1 parent b867ae2 commit 19023d9

2 files changed

Lines changed: 138 additions & 116 deletions

File tree

lambdas/api-handler/src/services/__tests__/letter-operations.test.ts

Lines changed: 87 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -189,28 +189,19 @@ describe("getLetterDataUrl function", () => {
189189
});
190190
});
191191

192+
function makeLetterDto(n: number): LetterDto {
193+
return {
194+
id: `letter${n}`,
195+
status: "PENDING",
196+
supplierId: `testSupplier}`,
197+
};
198+
}
199+
192200
describe("enqueueLetterUpdateRequests function", () => {
193201
beforeEach(() => {
194202
jest.clearAllMocks();
195203
});
196204

197-
const lettersToUpdate: LetterDto[] = [
198-
{
199-
id: "id1",
200-
status: "REJECTED",
201-
supplierId: "s1",
202-
specificationId: "spec1",
203-
groupId: "g1",
204-
reasonCode: "123",
205-
reasonText: "Reason text",
206-
},
207-
{
208-
id: "id2",
209-
status: "ACCEPTED",
210-
supplierId: "s1",
211-
},
212-
];
213-
214205
it("should update the letter status successfully", async () => {
215206
const sqsClient = { send: jest.fn() } as unknown as SQSClient;
216207
const logger = { error: jest.fn() } as unknown as pino.Logger;
@@ -219,6 +210,13 @@ describe("enqueueLetterUpdateRequests function", () => {
219210
};
220211
const deps: Deps = { sqsClient, logger, env } as Deps;
221212

213+
const lettersToUpdate = Array.from({ length: 25 }, (_, i) =>
214+
makeLetterDto(i),
215+
);
216+
217+
const sqsClientSendMock = sqsClient.send as jest.Mock;
218+
sqsClientSendMock.mockResolvedValue({ Failed: [] });
219+
222220
const result = await enqueueLetterUpdateRequests(
223221
lettersToUpdate,
224222
"correlationId1",
@@ -227,65 +225,55 @@ describe("enqueueLetterUpdateRequests function", () => {
227225

228226
expect(result).toBeUndefined();
229227

230-
expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
231-
1,
232-
expect.objectContaining({
233-
input: {
234-
QueueUrl: deps.env.QUEUE_URL,
235-
MessageAttributes: {
236-
CorrelationId: {
237-
DataType: "String",
238-
StringValue: "correlationId1",
239-
},
240-
},
241-
MessageBody: JSON.stringify({
242-
id: lettersToUpdate[0].id,
243-
status: lettersToUpdate[0].status,
244-
supplierId: lettersToUpdate[0].supplierId,
245-
specificationId: lettersToUpdate[0].specificationId,
246-
groupId: lettersToUpdate[0].groupId,
247-
reasonCode: lettersToUpdate[0].reasonCode,
248-
reasonText: lettersToUpdate[0].reasonText,
249-
}),
250-
},
251-
}),
252-
);
228+
// processes 10 at a time (25 -> 10+10+5)
229+
expect(sqsClientSendMock).toHaveBeenCalledTimes(3);
253230

254-
expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
255-
2,
256-
expect.objectContaining({
257-
input: {
258-
QueueUrl: deps.env.QUEUE_URL,
259-
MessageAttributes: {
260-
CorrelationId: {
261-
DataType: "String",
262-
StringValue: "correlationId1",
263-
},
264-
},
265-
MessageBody: JSON.stringify({
266-
id: lettersToUpdate[1].id,
267-
status: lettersToUpdate[1].status,
268-
supplierId: lettersToUpdate[1].supplierId,
269-
}),
270-
},
271-
}),
272-
);
231+
const firstCallArg = sqsClientSendMock.mock.calls[0][0];
232+
const firstInput = firstCallArg.input;
233+
234+
expect(firstInput.QueueUrl).toBe(deps.env.QUEUE_URL);
235+
expect(Array.isArray(firstInput.Entries)).toBe(true);
236+
expect(firstInput.Entries.length).toBe(10);
237+
238+
expect(firstInput.Entries[0].Id).toBe("0-0");
239+
expect(firstInput.Entries[9].Id).toBe("0-9");
240+
241+
expect(
242+
firstInput.Entries[0].MessageAttributes.CorrelationId.StringValue,
243+
).toBe("correlationId1");
244+
245+
const parsed = JSON.parse(firstInput.Entries[0].MessageBody);
246+
expect(parsed.id).toBe("letter0");
247+
248+
// check last batch had 5 entries
249+
const thirdCallArg = sqsClientSendMock.mock.calls[2][0];
250+
const thirdInput = thirdCallArg.input;
251+
expect(thirdInput.Entries.length).toBe(5);
252+
// ids in third batch should start "2-0"
253+
expect(thirdInput.Entries[0].Id).toBe("2-0");
273254
});
274255

275-
it("should log error if enqueueing fails", async () => {
276-
const mockError = new Error("error");
256+
it("should log error when SendMessageBatch returns Failed entries", async () => {
277257
const sqsClient = {
278258
send: jest
279259
.fn()
280-
.mockRejectedValueOnce(mockError)
281-
.mockResolvedValueOnce({ MessageId: "m1" }),
260+
.mockResolvedValueOnce({ Failed: [] }) // first batch succeeds
261+
.mockResolvedValueOnce({
262+
Failed: [
263+
{ Id: "1-1", SenderFault: false, Code: "Err", Message: "failed" },
264+
],
265+
}),
282266
} as unknown as SQSClient;
283267
const logger = { error: jest.fn() } as unknown as pino.Logger;
284268
const env = {
285269
QUEUE_URL: "sqsUrl",
286270
};
287271
const deps: Deps = { sqsClient, logger, env } as Deps;
288272

273+
const lettersToUpdate = Array.from({ length: 12 }, (_, i) =>
274+
makeLetterDto(i),
275+
);
276+
289277
const result = await enqueueLetterUpdateRequests(
290278
lettersToUpdate,
291279
"correlationId1",
@@ -294,36 +282,44 @@ describe("enqueueLetterUpdateRequests function", () => {
294282

295283
expect(result).toBeUndefined();
296284

297-
expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
298-
2,
299-
expect.objectContaining({
300-
input: {
301-
QueueUrl: deps.env.QUEUE_URL,
302-
MessageAttributes: {
303-
CorrelationId: {
304-
DataType: "String",
305-
StringValue: "correlationId1",
306-
},
307-
},
308-
MessageBody: JSON.stringify({
309-
id: lettersToUpdate[1].id,
310-
status: lettersToUpdate[1].status,
311-
supplierId: lettersToUpdate[1].supplierId,
312-
}),
313-
},
314-
}),
315-
);
285+
// 12 = 10 + 2
286+
expect(deps.sqsClient.send).toHaveBeenCalledTimes(2);
316287

317288
expect(deps.logger.error).toHaveBeenCalledTimes(1);
318-
expect(deps.logger.error).toHaveBeenCalledWith(
319-
{
320-
err: mockError,
321-
correlationId: "correlationId1",
322-
letterId: lettersToUpdate[0].id,
323-
letterStatus: lettersToUpdate[0].status,
324-
supplierId: lettersToUpdate[0].supplierId,
325-
},
326-
"Error enqueuing letter status update",
289+
const errorArgs = (deps.logger.error as jest.Mock).mock.calls[0][0];
290+
expect(errorArgs.failed).toBeDefined();
291+
expect(Array.isArray(errorArgs.failed)).toBe(true);
292+
expect(errorArgs.failed[0].Id).toBe("1-1");
293+
});
294+
295+
it("should log error if enqueueing fails", async () => {
296+
const sqsClient = {
297+
send: jest
298+
.fn()
299+
.mockResolvedValueOnce({ Failed: [] }) // batch 0
300+
.mockImplementationOnce(() => {
301+
throw new Error("some failure");
302+
}) // batch 1
303+
.mockResolvedValueOnce({ Failed: [] }), // batch 2
304+
} as unknown as SQSClient;
305+
const logger = { error: jest.fn() } as unknown as pino.Logger;
306+
const env = {
307+
QUEUE_URL: "sqsUrl",
308+
};
309+
const deps: Deps = { sqsClient, logger, env } as Deps;
310+
311+
const lettersToUpdate = Array.from({ length: 21 }, (_, i) =>
312+
makeLetterDto(i),
327313
);
314+
315+
await enqueueLetterUpdateRequests(lettersToUpdate, "correlationId1", deps);
316+
317+
// all 3 attempted
318+
expect(deps.sqsClient.send).toHaveBeenCalledTimes(3);
319+
320+
expect(deps.logger.error).toHaveBeenCalledTimes(1);
321+
const logged = (deps.logger.error as jest.Mock).mock.calls[0][0];
322+
expect(logged.correlationId).toBe("correlationId1");
323+
expect(logged.err).toBeInstanceOf(Error);
328324
});
329325
});

lambdas/api-handler/src/services/letter-operations.ts

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { LetterBase, LetterRepository } from "@internal/datastore";
22
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
33
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
4-
import { SendMessageCommand } from "@aws-sdk/client-sqs";
4+
import { SendMessageBatchCommand } from "@aws-sdk/client-sqs";
55
import NotFoundError from "../errors/not-found-error";
66
import { LetterDto } from "../contracts/letters";
77
import { ApiErrorDetail } from "../contracts/errors";
@@ -81,34 +81,60 @@ export const getLetterDataUrl = async (
8181
}
8282
};
8383

84+
function chunk(arr: LetterDto[], size: number): LetterDto[][] {
85+
const out: LetterDto[][] = [];
86+
for (let i = 0; i < arr.length; i += size) out.push(arr.slice(i, i + size));
87+
return out;
88+
}
89+
8490
export async function enqueueLetterUpdateRequests(
8591
updateRequests: LetterDto[],
8692
correlationId: string,
8793
deps: Deps,
8894
) {
89-
const tasks = updateRequests.map(async (request: LetterDto) => {
90-
try {
91-
const command = new SendMessageCommand({
92-
QueueUrl: deps.env.QUEUE_URL,
93-
MessageAttributes: {
94-
CorrelationId: { DataType: "String", StringValue: correlationId },
95-
},
96-
MessageBody: JSON.stringify(request),
97-
});
98-
await deps.sqsClient.send(command);
99-
} catch (error) {
100-
deps.logger.error(
101-
{
102-
err: error,
103-
correlationId,
104-
letterId: request.id,
105-
letterStatus: request.status,
106-
supplierId: request.supplierId,
107-
},
108-
"Error enqueuing letter status update",
109-
);
110-
}
111-
});
95+
const BATCH_SIZE = 10; // SQS SendMessageBatch max
96+
const CONCURRENCY = 5; // number of parallel batch API calls
97+
98+
const batches = chunk(updateRequests, BATCH_SIZE);
11299

113-
await Promise.all(tasks);
100+
// send batches in groups with limited concurrency
101+
// BATCH_SIZE * CONCURRENCY is the number of total updates / db calls in-flight
102+
for (let i = 0; i < batches.length; i += CONCURRENCY) {
103+
const window = batches.slice(i, i + CONCURRENCY);
104+
105+
await Promise.all(
106+
window.map(async (batch, batchIdx) => {
107+
const entries = batch.map((request, idx) => ({
108+
Id: `${i + batchIdx}-${idx}`, // unique per batch entry
109+
MessageBody: JSON.stringify(request),
110+
MessageAttributes: {
111+
CorrelationId: { DataType: "String", StringValue: correlationId },
112+
},
113+
}));
114+
115+
const cmd = new SendMessageBatchCommand({
116+
QueueUrl: deps.env.QUEUE_URL,
117+
Entries: entries,
118+
});
119+
120+
try {
121+
const result = await deps.sqsClient.send(cmd);
122+
if (result.Failed && result.Failed.length > 0) {
123+
deps.logger.error(
124+
{ failed: result.Failed },
125+
"Some batch entries failed",
126+
);
127+
}
128+
} catch (error) {
129+
deps.logger.error(
130+
{
131+
err: error,
132+
correlationId,
133+
},
134+
"Error enqueuing letter status updates",
135+
);
136+
}
137+
}),
138+
);
139+
}
114140
}

0 commit comments

Comments
 (0)