Skip to content

Commit ab13894

Browse files
CCM-13555: Improve enqueue performance (#336)
1 parent af76307 commit ab13894

2 files changed

Lines changed: 138 additions & 110 deletions

File tree

lambdas/api-handler/src/services/__tests__/letter-operations.test.ts

Lines changed: 87 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -192,26 +192,19 @@ describe("getLetterDataUrl function", () => {
192192
});
193193
});
194194

195+
function makeUpdateLetterCommand(n: number): UpdateLetterCommand {
196+
return {
197+
id: `letter${n}`,
198+
status: "PENDING",
199+
supplierId: `testSupplier}`,
200+
};
201+
}
202+
195203
describe("enqueueLetterUpdateRequests function", () => {
196204
beforeEach(() => {
197205
jest.clearAllMocks();
198206
});
199207

200-
const updateLetterCommands: UpdateLetterCommand[] = [
201-
{
202-
id: "id1",
203-
status: "REJECTED",
204-
supplierId: "s1",
205-
reasonCode: "123",
206-
reasonText: "Reason text",
207-
},
208-
{
209-
id: "id2",
210-
status: "ACCEPTED",
211-
supplierId: "s1",
212-
},
213-
];
214-
215208
it("should update the letter status successfully", async () => {
216209
const sqsClient = { send: jest.fn() } as unknown as SQSClient;
217210
const logger = { error: jest.fn() } as unknown as pino.Logger;
@@ -220,6 +213,13 @@ describe("enqueueLetterUpdateRequests function", () => {
220213
};
221214
const deps: Deps = { sqsClient, logger, env } as Deps;
222215

216+
const updateLetterCommands = Array.from({ length: 25 }, (_, i) =>
217+
makeUpdateLetterCommand(i),
218+
);
219+
220+
const sqsClientSendMock = sqsClient.send as jest.Mock;
221+
sqsClientSendMock.mockResolvedValue({ Failed: [] });
222+
223223
const result = await enqueueLetterUpdateRequests(
224224
updateLetterCommands,
225225
"correlationId1",
@@ -228,63 +228,55 @@ describe("enqueueLetterUpdateRequests function", () => {
228228

229229
expect(result).toBeUndefined();
230230

231-
expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
232-
1,
233-
expect.objectContaining({
234-
input: {
235-
QueueUrl: deps.env.QUEUE_URL,
236-
MessageAttributes: {
237-
CorrelationId: {
238-
DataType: "String",
239-
StringValue: "correlationId1",
240-
},
241-
},
242-
MessageBody: JSON.stringify({
243-
id: updateLetterCommands[0].id,
244-
status: updateLetterCommands[0].status,
245-
supplierId: updateLetterCommands[0].supplierId,
246-
reasonCode: updateLetterCommands[0].reasonCode,
247-
reasonText: updateLetterCommands[0].reasonText,
248-
}),
249-
},
250-
}),
251-
);
231+
// processes 10 at a time (25 -> 10+10+5)
232+
expect(sqsClientSendMock).toHaveBeenCalledTimes(3);
252233

253-
expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
254-
2,
255-
expect.objectContaining({
256-
input: {
257-
QueueUrl: deps.env.QUEUE_URL,
258-
MessageAttributes: {
259-
CorrelationId: {
260-
DataType: "String",
261-
StringValue: "correlationId1",
262-
},
263-
},
264-
MessageBody: JSON.stringify({
265-
id: updateLetterCommands[1].id,
266-
status: updateLetterCommands[1].status,
267-
supplierId: updateLetterCommands[1].supplierId,
268-
}),
269-
},
270-
}),
271-
);
234+
const firstCallArg = sqsClientSendMock.mock.calls[0][0];
235+
const firstInput = firstCallArg.input;
236+
237+
expect(firstInput.QueueUrl).toBe(deps.env.QUEUE_URL);
238+
expect(Array.isArray(firstInput.Entries)).toBe(true);
239+
expect(firstInput.Entries.length).toBe(10);
240+
241+
expect(firstInput.Entries[0].Id).toBe("0-0");
242+
expect(firstInput.Entries[9].Id).toBe("0-9");
243+
244+
expect(
245+
firstInput.Entries[0].MessageAttributes.CorrelationId.StringValue,
246+
).toBe("correlationId1");
247+
248+
const parsed = JSON.parse(firstInput.Entries[0].MessageBody);
249+
expect(parsed.id).toBe("letter0");
250+
251+
// check last batch had 5 entries
252+
const thirdCallArg = sqsClientSendMock.mock.calls[2][0];
253+
const thirdInput = thirdCallArg.input;
254+
expect(thirdInput.Entries.length).toBe(5);
255+
// ids in third batch should start "2-0"
256+
expect(thirdInput.Entries[0].Id).toBe("2-0");
272257
});
273258

274-
it("should log error if enqueueing fails", async () => {
275-
const mockError = new Error("error");
259+
it("should log error when SendMessageBatch returns Failed entries", async () => {
276260
const sqsClient = {
277261
send: jest
278262
.fn()
279-
.mockRejectedValueOnce(mockError)
280-
.mockResolvedValueOnce({ MessageId: "m1" }),
263+
.mockResolvedValueOnce({ Failed: [] }) // first batch succeeds
264+
.mockResolvedValueOnce({
265+
Failed: [
266+
{ Id: "1-1", SenderFault: false, Code: "Err", Message: "failed" },
267+
],
268+
}),
281269
} as unknown as SQSClient;
282270
const logger = { error: jest.fn() } as unknown as pino.Logger;
283271
const env = {
284272
QUEUE_URL: "sqsUrl",
285273
};
286274
const deps: Deps = { sqsClient, logger, env } as Deps;
287275

276+
const updateLetterCommands = Array.from({ length: 12 }, (_, i) =>
277+
makeUpdateLetterCommand(i),
278+
);
279+
288280
const result = await enqueueLetterUpdateRequests(
289281
updateLetterCommands,
290282
"correlationId1",
@@ -293,36 +285,44 @@ describe("enqueueLetterUpdateRequests function", () => {
293285

294286
expect(result).toBeUndefined();
295287

296-
expect(deps.sqsClient.send).toHaveBeenNthCalledWith(
297-
2,
298-
expect.objectContaining({
299-
input: {
300-
QueueUrl: deps.env.QUEUE_URL,
301-
MessageAttributes: {
302-
CorrelationId: {
303-
DataType: "String",
304-
StringValue: "correlationId1",
305-
},
306-
},
307-
MessageBody: JSON.stringify({
308-
id: updateLetterCommands[1].id,
309-
status: updateLetterCommands[1].status,
310-
supplierId: updateLetterCommands[1].supplierId,
311-
}),
312-
},
313-
}),
314-
);
288+
// 12 = 10 + 2
289+
expect(deps.sqsClient.send).toHaveBeenCalledTimes(2);
315290

316291
expect(deps.logger.error).toHaveBeenCalledTimes(1);
317-
expect(deps.logger.error).toHaveBeenCalledWith(
318-
{
319-
err: mockError,
320-
correlationId: "correlationId1",
321-
letterId: updateLetterCommands[0].id,
322-
letterStatus: updateLetterCommands[0].status,
323-
supplierId: updateLetterCommands[0].supplierId,
324-
},
325-
"Error enqueuing letter status update",
292+
const errorArgs = (deps.logger.error as jest.Mock).mock.calls[0][0];
293+
expect(errorArgs.failed).toBeDefined();
294+
expect(Array.isArray(errorArgs.failed)).toBe(true);
295+
expect(errorArgs.failed[0].Id).toBe("1-1");
296+
});
297+
298+
it("should log error if enqueueing fails", async () => {
299+
const sqsClient = {
300+
send: jest
301+
.fn()
302+
.mockResolvedValueOnce({ Failed: [] }) // batch 0
303+
.mockImplementationOnce(() => {
304+
throw new Error("some failure");
305+
}) // batch 1
306+
.mockResolvedValueOnce({ Failed: [] }), // batch 2
307+
} as unknown as SQSClient;
308+
const logger = { error: jest.fn() } as unknown as pino.Logger;
309+
const env = {
310+
QUEUE_URL: "sqsUrl",
311+
};
312+
const deps: Deps = { sqsClient, logger, env } as Deps;
313+
314+
const lettersToUpdate = Array.from({ length: 21 }, (_, i) =>
315+
makeUpdateLetterCommand(i),
326316
);
317+
318+
await enqueueLetterUpdateRequests(lettersToUpdate, "correlationId1", deps);
319+
320+
// all 3 attempted
321+
expect(deps.sqsClient.send).toHaveBeenCalledTimes(3);
322+
323+
expect(deps.logger.error).toHaveBeenCalledTimes(1);
324+
const logged = (deps.logger.error as jest.Mock).mock.calls[0][0];
325+
expect(logged.correlationId).toBe("correlationId1");
326+
expect(logged.err).toBeInstanceOf(Error);
327327
});
328328
});

lambdas/api-handler/src/services/letter-operations.ts

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { LetterBase, LetterRepository } from "@internal/datastore";
22
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
33
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
4-
import { SendMessageCommand } from "@aws-sdk/client-sqs";
4+
import { SendMessageBatchCommand } from "@aws-sdk/client-sqs";
55
import NotFoundError from "../errors/not-found-error";
66
import { UpdateLetterCommand } from "../contracts/letters";
77
import { ApiErrorDetail } from "../contracts/errors";
@@ -81,36 +81,64 @@ export const getLetterDataUrl = async (
8181
}
8282
};
8383

84+
function chunk(
85+
arr: UpdateLetterCommand[],
86+
size: number,
87+
): UpdateLetterCommand[][] {
88+
const chunks: UpdateLetterCommand[][] = [];
89+
for (let i = 0; i < arr.length; i += size)
90+
chunks.push(arr.slice(i, i + size));
91+
return chunks;
92+
}
93+
8494
export async function enqueueLetterUpdateRequests(
8595
updateLetterCommands: UpdateLetterCommand[],
8696
correlationId: string,
8797
deps: Deps,
8898
) {
89-
const tasks = updateLetterCommands.map(
90-
async (request: UpdateLetterCommand) => {
91-
try {
92-
const command = new SendMessageCommand({
93-
QueueUrl: deps.env.QUEUE_URL,
99+
const BATCH_SIZE = 10; // SQS SendMessageBatch max
100+
const CONCURRENCY = 5; // number of parallel batch API calls
101+
102+
const batches = chunk(updateLetterCommands, BATCH_SIZE);
103+
104+
// send batches in groups with limited concurrency
105+
// BATCH_SIZE * CONCURRENCY is the number of total updates / db calls in-flight
106+
for (let i = 0; i < batches.length; i += CONCURRENCY) {
107+
const window = batches.slice(i, i + CONCURRENCY);
108+
109+
await Promise.all(
110+
window.map(async (batch, batchIdx) => {
111+
const entries = batch.map((request, idx) => ({
112+
Id: `${i + batchIdx}-${idx}`, // unique per batch entry
113+
MessageBody: JSON.stringify(request),
94114
MessageAttributes: {
95115
CorrelationId: { DataType: "String", StringValue: correlationId },
96116
},
97-
MessageBody: JSON.stringify(request),
117+
}));
118+
119+
const cmd = new SendMessageBatchCommand({
120+
QueueUrl: deps.env.QUEUE_URL,
121+
Entries: entries,
98122
});
99-
await deps.sqsClient.send(command);
100-
} catch (error) {
101-
deps.logger.error(
102-
{
103-
err: error,
104-
correlationId,
105-
letterId: request.id,
106-
letterStatus: request.status,
107-
supplierId: request.supplierId,
108-
},
109-
"Error enqueuing letter status update",
110-
);
111-
}
112-
},
113-
);
114123

115-
await Promise.all(tasks);
124+
try {
125+
const result = await deps.sqsClient.send(cmd);
126+
if (result.Failed && result.Failed.length > 0) {
127+
deps.logger.error(
128+
{ failed: result.Failed },
129+
"Some batch entries failed",
130+
);
131+
}
132+
} catch (error) {
133+
deps.logger.error(
134+
{
135+
err: error,
136+
correlationId,
137+
},
138+
"Error enqueuing letter status updates",
139+
);
140+
}
141+
}),
142+
);
143+
}
116144
}

0 commit comments

Comments
 (0)