Skip to content

Commit eaebcc6

Browse files
committed
Fix metrics
1 parent 687c038 commit eaebcc6

2 files changed

Lines changed: 166 additions & 171 deletions

File tree

lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts

Lines changed: 95 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,6 @@ import createHandler from "../update-letter-queue";
1616
import { EnvVars } from "../env";
1717
import { LetterStatus } from "../../../api-handler/src/contracts/letters";
1818

19-
const mockPutMetric = jest.fn();
20-
const mockSetNamespace = jest.fn();
21-
22-
jest.mock("aws-embedded-metrics", () => ({
23-
// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
24-
metricScope: (fn: Function) =>
25-
fn({
26-
putMetric: mockPutMetric,
27-
setNamespace: mockSetNamespace,
28-
}),
29-
Unit: { Count: "Count" },
30-
}));
31-
3219
const mockedDeps: jest.Mocked<Deps> = {
3320
letterQueueRepository: {
3421
putLetter: jest.fn(),
@@ -198,105 +185,69 @@ describe("update-letter-queue Lambda", () => {
198185
);
199186
});
200187
});
201-
});
202188

203-
describe("Metrics", () => {
204-
it("emits success metrics when all letters are processed successfully", async () => {
205-
const handler = createHandler(mockedDeps);
206-
const newLetter1 = generateLetter("PENDING", "1");
207-
const newLetter2 = generateLetter("PENDING", "2");
208-
209-
const testData = generateKinesisEvent([
210-
generateInsertRecord(newLetter1),
211-
generateInsertRecord(newLetter2),
212-
]);
213-
await handler(testData, mockDeep<Context>(), jest.fn());
214-
215-
expect(mockSetNamespace).toHaveBeenCalledWith("update-letter-queue");
216-
expect(mockPutMetric).toHaveBeenCalledWith(
217-
"letters queued successfully",
218-
2,
219-
"Count",
220-
);
221-
expect(mockPutMetric).toHaveBeenCalledWith(
222-
"letters queued failed",
223-
0,
224-
"Count",
225-
);
226-
});
189+
describe("Metrics", () => {
190+
it("emits success metrics when all letters are processed successfully", async () => {
191+
const handler = createHandler(mockedDeps);
192+
const newLetter1 = generateLetter("PENDING", "1");
193+
const newLetter2 = generateLetter("PENDING", "2");
227194

228-
it("emits failure metrics when a letter fails to process", async () => {
229-
const handler = createHandler(mockedDeps);
230-
const newLetter1 = generateLetter("PENDING", "1");
231-
const newLetter2 = generateLetter("PENDING", "2");
232-
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
233-
.mockResolvedValueOnce({})
234-
.mockRejectedValueOnce(new Error("DynamoDB error"));
235-
236-
const testData = generateKinesisEvent([
237-
generateInsertRecord(newLetter1),
238-
generateInsertRecord(newLetter2),
239-
]);
240-
await handler(testData, mockDeep<Context>(), jest.fn());
241-
242-
expect(mockSetNamespace).toHaveBeenCalledWith("update-letter-queue");
243-
expect(mockPutMetric).toHaveBeenCalledWith(
244-
"letters queued successfully",
245-
1,
246-
"Count",
247-
);
248-
expect(mockPutMetric).toHaveBeenCalledWith(
249-
"letters queued failed",
250-
1,
251-
"Count",
252-
);
253-
});
195+
const testData = generateKinesisEvent([
196+
generateInsertRecord(newLetter1),
197+
generateInsertRecord(newLetter2),
198+
]);
199+
await handler(testData, mockDeep<Context>(), jest.fn());
254200

255-
it("does not count a reprocessed event as a success or failure", async () => {
256-
const handler = createHandler(mockedDeps);
257-
const newLetter1 = generateLetter("PENDING", "1");
258-
const newLetter2 = generateLetter("PENDING", "2");
259-
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
260-
.mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1"))
261-
.mockResolvedValueOnce({});
262-
263-
const testData = generateKinesisEvent([
264-
generateInsertRecord(newLetter1),
265-
generateInsertRecord(newLetter2),
266-
]);
267-
await handler(testData, mockDeep<Context>(), jest.fn());
268-
269-
expect(mockSetNamespace).toHaveBeenCalledWith("update-letter-queue");
270-
expect(mockPutMetric).toHaveBeenCalledWith(
271-
"letters queued successfully",
272-
1,
273-
"Count",
274-
);
275-
expect(mockPutMetric).toHaveBeenCalledWith(
276-
"letters queued failed",
277-
0,
278-
"Count",
279-
);
280-
});
201+
assertSuccessMetricLogged(2);
202+
assertFailureMetricLogged(0);
203+
});
281204

282-
it("emits zero success metrics when no pending letters are in the batch", async () => {
283-
const handler = createHandler(mockedDeps);
284-
const newLetter = generateLetter("PRINTED");
205+
it("emits failure metrics when a letter fails to process", async () => {
206+
const handler = createHandler(mockedDeps);
207+
const newLetter1 = generateLetter("PENDING", "1");
208+
const newLetter2 = generateLetter("PENDING", "2");
209+
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
210+
.mockResolvedValueOnce({})
211+
.mockRejectedValueOnce(new Error("DynamoDB error"));
285212

286-
const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
287-
await handler(testData, mockDeep<Context>(), jest.fn());
213+
const testData = generateKinesisEvent([
214+
generateInsertRecord(newLetter1),
215+
generateInsertRecord(newLetter2),
216+
]);
217+
await handler(testData, mockDeep<Context>(), jest.fn());
288218

289-
expect(mockSetNamespace).toHaveBeenCalledWith("update-letter-queue");
290-
expect(mockPutMetric).toHaveBeenCalledWith(
291-
"letters queued successfully",
292-
0,
293-
"Count",
294-
);
295-
expect(mockPutMetric).toHaveBeenCalledWith(
296-
"letters queued failed",
297-
0,
298-
"Count",
299-
);
219+
assertSuccessMetricLogged(1);
220+
assertFailureMetricLogged(1);
221+
});
222+
223+
it("does not count a reprocessed event as a success or failure", async () => {
224+
const handler = createHandler(mockedDeps);
225+
const newLetter1 = generateLetter("PENDING", "1");
226+
const newLetter2 = generateLetter("PENDING", "2");
227+
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
228+
.mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1"))
229+
.mockResolvedValueOnce({});
230+
231+
const testData = generateKinesisEvent([
232+
generateInsertRecord(newLetter1),
233+
generateInsertRecord(newLetter2),
234+
]);
235+
await handler(testData, mockDeep<Context>(), jest.fn());
236+
237+
assertSuccessMetricLogged(1);
238+
assertFailureMetricLogged(0);
239+
});
240+
241+
it("emits zero success metrics when no pending letters are in the batch", async () => {
242+
const handler = createHandler(mockedDeps);
243+
const newLetter = generateLetter("PRINTED");
244+
245+
const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
246+
await handler(testData, mockDeep<Context>(), jest.fn());
247+
248+
assertSuccessMetricLogged(0);
249+
assertFailureMetricLogged(0);
250+
});
300251
});
301252
});
302253

@@ -339,3 +290,41 @@ function mapToImage(oldLetter: Letter) {
339290
]),
340291
);
341292
}
293+
294+
function assertSuccessMetricLogged(count: number) {
295+
expect(mockedDeps.logger.info).toHaveBeenCalledWith(
296+
expect.objectContaining({
297+
_aws: expect.objectContaining({
298+
CloudWatchMetrics: expect.arrayContaining([
299+
expect.objectContaining({
300+
Metrics: [
301+
expect.objectContaining({
302+
Name: "letters queued successfully",
303+
Value: count,
304+
}),
305+
],
306+
}),
307+
]),
308+
}),
309+
}),
310+
);
311+
}
312+
313+
function assertFailureMetricLogged(count: number) {
314+
expect(mockedDeps.logger.info).toHaveBeenCalledWith(
315+
expect.objectContaining({
316+
_aws: expect.objectContaining({
317+
CloudWatchMetrics: expect.arrayContaining([
318+
expect.objectContaining({
319+
Metrics: [
320+
expect.objectContaining({
321+
Name: "letters queued failed",
322+
Value: count,
323+
}),
324+
],
325+
}),
326+
]),
327+
}),
328+
}),
329+
);
330+
}

lambdas/update-letter-queue/src/update-letter-queue.ts

Lines changed: 71 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
KinesisStreamRecord,
66
} from "aws-lambda";
77
import { unmarshall } from "@aws-sdk/util-dynamodb";
8-
import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics";
8+
import { Unit } from "aws-embedded-metrics";
99
import {
1010
InsertPendingLetter,
1111
Letter,
@@ -15,68 +15,65 @@ import {
1515
import { Deps } from "./deps";
1616

1717
export default function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
18-
return metricScope((metrics: MetricsLogger) => {
19-
return async (streamEvent: KinesisStreamEvent) => {
20-
let successCount = 0;
21-
22-
deps.logger.info({ description: "Received event", streamEvent });
23-
deps.logger.info({
24-
description: "Number of records",
25-
count: streamEvent.Records?.length || 0,
26-
});
27-
28-
const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) =>
29-
extractPayload(record, deps),
30-
);
31-
32-
const newPendingLetters = ddbRecords
33-
.filter((record) => filterRecord(record, deps))
34-
.map((element) => extractNewLetter(element))
35-
.map((element) => mapLetterToPendingLetter(element));
36-
37-
for (const pendingLetter of newPendingLetters) {
38-
try {
39-
deps.logger.info({
40-
description: "Persisting pending letter",
18+
return async (streamEvent: KinesisStreamEvent) => {
19+
let successCount = 0;
20+
21+
deps.logger.info({ description: "Received event", streamEvent });
22+
deps.logger.info({
23+
description: "Number of records",
24+
count: streamEvent.Records?.length || 0,
25+
});
26+
27+
const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) =>
28+
extractPayload(record, deps),
29+
);
30+
31+
const newPendingLetters = ddbRecords
32+
.filter((record) => filterRecord(record, deps))
33+
.map((element) => extractNewLetter(element))
34+
.map((element) => mapLetterToPendingLetter(element));
35+
36+
for (const pendingLetter of newPendingLetters) {
37+
try {
38+
deps.logger.info({
39+
description: "Persisting pending letter",
40+
pendingLetter,
41+
});
42+
await deps.letterQueueRepository.putLetter(pendingLetter);
43+
successCount += 1;
44+
} catch (error) {
45+
if (error instanceof LetterAlreadyExistsError) {
46+
deps.logger.warn({
47+
description: "Letter already exists",
48+
supplierId: pendingLetter.supplierId,
49+
letterId: pendingLetter.letterId,
50+
});
51+
} else {
52+
deps.logger.error({
53+
description: "Error persisting pending letter",
54+
error,
4155
pendingLetter,
4256
});
43-
await deps.letterQueueRepository.putLetter(pendingLetter);
44-
successCount += 1;
45-
} catch (error) {
46-
if (error instanceof LetterAlreadyExistsError) {
47-
deps.logger.warn({
48-
description: "Letter already exists",
49-
supplierId: pendingLetter.supplierId,
50-
letterId: pendingLetter.letterId,
51-
});
52-
} else {
53-
deps.logger.error({
54-
description: "Error persisting pending letter",
55-
error,
56-
pendingLetter,
57-
});
58-
recordProcessing(deps, successCount, 1, metrics);
59-
// If we get a failure, return immediately without processing the remaining records. Since we are
60-
// working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
61-
// See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
62-
return {
63-
batchItemFailures: [{ itemIdentifier: pendingLetter.letterId }],
64-
};
65-
}
57+
recordProcessing(deps, successCount, 1);
58+
// If we get a failure, return immediately without processing the remaining records. Since we are
59+
// working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
60+
// See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
61+
return {
62+
batchItemFailures: [{ itemIdentifier: pendingLetter.letterId }],
63+
};
6664
}
6765
}
66+
}
6867

69-
recordProcessing(deps, successCount, 0, metrics);
70-
return { batchItemFailures: [] };
71-
};
72-
});
68+
recordProcessing(deps, successCount, 0);
69+
return { batchItemFailures: [] };
70+
};
7371
}
7472

7573
function recordProcessing(
7674
deps: Deps,
7775
successCount: number,
7876
failureCount: number,
79-
metrics: MetricsLogger,
8077
) {
8178
deps.logger.info({
8279
description: "Processing complete",
@@ -85,19 +82,8 @@ function recordProcessing(
8582
totalProcessed: successCount + failureCount,
8683
});
8784

88-
emitMetrics(metrics, successCount, failureCount);
89-
}
90-
91-
function emitMetrics(
92-
metrics: MetricsLogger,
93-
successCount: number,
94-
failureCount: number,
95-
) {
96-
metrics.setNamespace(
97-
process.env.AWS_LAMBDA_FUNCTION_NAME || "update-letter-queue",
98-
);
99-
metrics.putMetric("letters queued successfully", successCount, Unit.Count);
100-
metrics.putMetric("letters queued failed", failureCount, Unit.Count);
85+
deps.logger.info(buildMetric("letters queued successfully", successCount));
86+
deps.logger.info(buildMetric("letters queued failed", failureCount));
10187
}
10288

10389
function filterRecord(record: DynamoDBRecord, deps: Deps): boolean {
@@ -158,3 +144,23 @@ function mapLetterToPendingLetter(letter: Letter): InsertPendingLetter {
158144
groupId: letter.groupId,
159145
};
160146
}
147+
148+
function buildMetric(name: string, count: number) {
149+
const namespace =
150+
process.env.AWS_LAMBDA_FUNCTION_NAME || "update-letter-queue";
151+
return {
152+
LogGroup: namespace,
153+
ServiceName: namespace,
154+
_aws: {
155+
Timestamp: Date.now(),
156+
CloudWatchMetrics: [
157+
{
158+
Namespace: namespace,
159+
Dimensions: [["ServiceName", "LogGroup"]],
160+
Metrics: [{ Name: name, Value: count, Unit: Unit.Count }],
161+
},
162+
],
163+
},
164+
"events published": count,
165+
};
166+
}

0 commit comments

Comments
 (0)