Skip to content

Commit daacde1

Browse files
committed
Further work
1 parent 065d049 commit daacde1

6 files changed

Lines changed: 146 additions & 23 deletions

File tree

internal/datastore/src/__test__/letter-queue-repository.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
} from "./db";
88
import LetterQueueRepository from "../letter-queue-repository";
99
import { InsertPendingLetter } from "../types";
10+
import { LetterAlreadyExistsError } from "../errors";
1011
import { createTestLogger } from "./logs";
1112

1213
function createLetter(letterId = "letter1"): InsertPendingLetter {
@@ -78,13 +79,12 @@ describe("LetterQueueRepository", () => {
7879
assertTtl(pendingLetter.ttl, before, after);
7980
});
8081

81-
it("throws an error when creating a letter which already exists", async () => {
82+
it("throws LetterAlreadyExistsError when creating a letter which already exists", async () => {
8283
await letterQueueRepository.putLetter(createLetter());
84+
8385
await expect(
8486
letterQueueRepository.putLetter(createLetter()),
85-
).rejects.toThrow(
86-
"Letter with id letter1 already exists for supplier supplier1",
87-
);
87+
).rejects.toThrow(LetterAlreadyExistsError);
8888
});
8989

9090
it("rethrows errors from DynamoDB when creating a letter", async () => {

internal/datastore/src/errors.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/**
2+
* Error thrown when attempting to create a letter that already exists in the database.
3+
*/
4+
// eslint-disable-next-line import-x/prefer-default-export
5+
export class LetterAlreadyExistsError extends Error {
6+
constructor(
7+
public readonly supplierId: string,
8+
public readonly letterId: string,
9+
) {
10+
super(
11+
`Letter already exists: supplierId=${supplierId}, letterId=${letterId}`,
12+
);
13+
this.name = "LetterAlreadyExistsError";
14+
}
15+
}

internal/datastore/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from "./types";
2+
export * from "./errors";
23
export * from "./mi-repository";
34
export * from "./letter-repository";
45
export * from "./supplier-repository";

internal/datastore/src/letter-queue-repository.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
PendingLetter,
77
PendingLetterSchema,
88
} from "./types";
9+
import { LetterAlreadyExistsError } from "./errors";
910

1011
type LetterQueueRepositoryConfig = {
1112
letterQueueTableName: string;
@@ -61,8 +62,9 @@ export default class LetterQueueRepository {
6162
error instanceof Error &&
6263
error.name === "ConditionalCheckFailedException"
6364
) {
64-
throw new Error(
65-
`Letter with id ${pendingLetter.letterId} already exists for supplier ${pendingLetter.supplierId}`,
65+
throw new LetterAlreadyExistsError(
66+
insertPendingLetter.supplierId,
67+
insertPendingLetter.letterId,
6668
);
6769
}
6870
throw error;

lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import { Letter, LetterQueueRepository } from "internal/datastore/src";
1+
import {
2+
Letter,
3+
LetterAlreadyExistsError,
4+
LetterQueueRepository,
5+
} from "@internal/datastore";
26
import { mockDeep } from "jest-mock-extended";
37
import pino from "pino";
48
import {
@@ -29,7 +33,11 @@ const mockedDeps: jest.Mocked<Deps> = {
2933
letterQueueRepository: {
3034
putLetter: jest.fn(),
3135
} as unknown as LetterQueueRepository,
32-
logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger,
36+
logger: {
37+
info: jest.fn(),
38+
warn: jest.fn(),
39+
error: jest.fn(),
40+
} as unknown as pino.Logger,
3341
env: {} as unknown as EnvVars,
3442
} as Deps;
3543

@@ -61,6 +69,9 @@ describe("update-letter-queue Lambda", () => {
6169
it("processes new pending letters and persists them in the letter queue table", async () => {
6270
const handler = createHandler(mockedDeps);
6371
const newLetter = generateLetter("PENDING");
72+
(
73+
mockedDeps.letterQueueRepository.putLetter as jest.Mock
74+
).mockResolvedValue({ alreadyProcessed: false });
6475

6576
const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
6677
const result = await handler(testData, mockDeep<Context>(), jest.fn());
@@ -98,6 +109,16 @@ describe("update-letter-queue Lambda", () => {
98109
expect(mockedDeps.letterQueueRepository.putLetter).not.toHaveBeenCalled();
99110
expect(result.batchItemFailures).toEqual([]);
100111
});
112+
113+
it("handles empty Records array", async () => {
114+
const handler = createHandler(mockedDeps);
115+
const testData = { Records: [] } as unknown as KinesisStreamEvent;
116+
117+
const result = await handler(testData, mockDeep<Context>(), jest.fn());
118+
119+
expect(mockedDeps.letterQueueRepository.putLetter).not.toHaveBeenCalled();
120+
expect(result.batchItemFailures).toEqual([]);
121+
});
101122
});
102123

103124
describe("Error handling", () => {
@@ -132,6 +153,50 @@ describe("update-letter-queue Lambda", () => {
132153
);
133154
expect(result.batchItemFailures).toEqual([{ itemIdentifier: "1" }]);
134155
});
156+
157+
it("does not treat a replayed event as a failure", async () => {
158+
const handler = createHandler(mockedDeps);
159+
const newLetter1 = generateLetter("PENDING", "1");
160+
const newLetter2 = generateLetter("PENDING", "2");
161+
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
162+
.mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1"))
163+
.mockResolvedValueOnce({});
164+
165+
const testData = generateKinesisEvent([
166+
generateInsertRecord(newLetter1),
167+
generateInsertRecord(newLetter2),
168+
]);
169+
const result = await handler(testData, mockDeep<Context>(), jest.fn());
170+
171+
expect(result.batchItemFailures).toEqual([]);
172+
});
173+
174+
it("throws error when Kinesis payload cannot be parsed as JSON", async () => {
175+
const handler = createHandler(mockedDeps);
176+
const invalidJsonPayload = "not valid json {{{";
177+
const testData = {
178+
Records: [
179+
{
180+
kinesis: {
181+
sequenceNumber: "seq-123",
182+
data: Buffer.from(invalidJsonPayload).toString("base64"),
183+
},
184+
eventID: "event-123",
185+
},
186+
],
187+
} as unknown as KinesisStreamEvent;
188+
189+
await expect(
190+
handler(testData, mockDeep<Context>(), jest.fn()),
191+
).rejects.toThrow();
192+
193+
expect(mockedDeps.logger.error).toHaveBeenCalledWith(
194+
expect.objectContaining({
195+
description: "Error extracting payload",
196+
eventId: "event-123",
197+
}),
198+
);
199+
});
135200
});
136201
});
137202

@@ -187,6 +252,33 @@ describe("Metrics", () => {
187252
);
188253
});
189254

255+
it("does not count a reprocessed event as a success or failure", async () => {
256+
const handler = createHandler(mockedDeps);
257+
const newLetter1 = generateLetter("PENDING", "1");
258+
const newLetter2 = generateLetter("PENDING", "2");
259+
(mockedDeps.letterQueueRepository.putLetter as jest.Mock)
260+
.mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1"))
261+
.mockResolvedValueOnce({});
262+
263+
const testData = generateKinesisEvent([
264+
generateInsertRecord(newLetter1),
265+
generateInsertRecord(newLetter2),
266+
]);
267+
await handler(testData, mockDeep<Context>(), jest.fn());
268+
269+
expect(mockSetNamespace).toHaveBeenCalledWith("update-letter-queue");
270+
expect(mockPutMetric).toHaveBeenCalledWith(
271+
"letters queued successfully",
272+
1,
273+
"Count",
274+
);
275+
expect(mockPutMetric).toHaveBeenCalledWith(
276+
"letters queued failed",
277+
0,
278+
"Count",
279+
);
280+
});
281+
190282
it("emits zero success metrics when no pending letters are in the batch", async () => {
191283
const handler = createHandler(mockedDeps);
192284
const newLetter = generateLetter("PRINTED");

lambdas/update-letter-queue/src/update-letter-queue.ts

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ import {
66
} from "aws-lambda";
77
import { unmarshall } from "@aws-sdk/util-dynamodb";
88
import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics";
9-
import { InsertPendingLetter, Letter, LetterSchema } from "@internal/datastore";
9+
import {
10+
InsertPendingLetter,
11+
Letter,
12+
LetterAlreadyExistsError,
13+
LetterSchema,
14+
} from "@internal/datastore";
1015
import { Deps } from "./deps";
1116

1217
export default function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
@@ -38,18 +43,26 @@ export default function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
3843
await deps.letterQueueRepository.putLetter(pendingLetter);
3944
successCount += 1;
4045
} catch (error) {
41-
deps.logger.error({
42-
description: "Error persisting pending letter",
43-
error,
44-
pendingLetter,
45-
});
46-
recordProcessing(deps, successCount, 1, metrics);
47-
// If we get a failure, return immediately without processing the remaining records. Since we are
48-
// working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
49-
// See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
50-
return {
51-
batchItemFailures: [{ itemIdentifier: pendingLetter.letterId }],
52-
};
46+
if (error instanceof LetterAlreadyExistsError) {
47+
deps.logger.warn({
48+
description: "Letter already exists",
49+
supplierId: pendingLetter.supplierId,
50+
letterId: pendingLetter.letterId,
51+
});
52+
} else {
53+
deps.logger.error({
54+
description: "Error persisting pending letter",
55+
error,
56+
pendingLetter,
57+
});
58+
recordProcessing(deps, successCount, 1, metrics);
59+
// If we get a failure, return immediately without processing the remaining records. Since we are
60+
// working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
61+
// See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
62+
return {
63+
batchItemFailures: [{ itemIdentifier: pendingLetter.letterId }],
64+
};
65+
}
5366
}
5467
}
5568

@@ -125,8 +138,8 @@ function extractPayload(
125138
} catch (error) {
126139
deps.logger.error({
127140
description: "Error extracting payload",
128-
error,
129-
record,
141+
err: error,
142+
eventId: record.eventID,
130143
});
131144
throw error;
132145
}

0 commit comments

Comments
 (0)