Skip to content

Commit d117db3

Browse files
committed
CCM-13519 Add idempotency layer in front of upsert
1 parent b95b17f commit d117db3

11 files changed

Lines changed: 215 additions & 34 deletions

File tree

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
resource "aws_dynamodb_table" "upsert_idempotency" {
2+
name = "${local.csi}-upsert-idempotency"
3+
billing_mode = "PAY_PER_REQUEST"
4+
hash_key = "id"
5+
attribute {
6+
name = "id"
7+
type = "S"
8+
}
9+
ttl {
10+
attribute_name = "expiration"
11+
enabled = true
12+
}
13+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
resource "aws_iam_role" "upsert_idempotency_role" {
2+
name = "${local.csi}-upsert-idempotency-role"
3+
4+
assume_role_policy = jsonencode({
5+
Version = "2012-10-17"
6+
Statement = [
7+
{
8+
Sid = ""
9+
Effect = "Allow"
10+
Principal = {
11+
Service = "lambda.amazonaws.com"
12+
}
13+
Action = "sts:AssumeRole"
14+
},
15+
]
16+
})
17+
}
18+
19+
resource "aws_iam_policy" "upsert_idempotency_policy" {
20+
name = "${local.csi}-upsert-idempotency-policy"
21+
description = "IAM policy for Lambda function to access DynamoDB"
22+
policy = jsonencode({
23+
Version = "2012-10-17"
24+
Statement = [
25+
{
26+
Sid = "AllowDynamodbReadWrite"
27+
Effect = "Allow"
28+
Action = [
29+
"dynamodb:PutItem",
30+
"dynamodb:GetItem",
31+
"dynamodb:UpdateItem",
32+
"dynamodb:DeleteItem",
33+
]
34+
Resource = aws_dynamodb_table.upsert_idempotency.arn
35+
},
36+
]
37+
})
38+
}
39+
40+
resource "aws_iam_role_policy_attachment" "upsert_idempotency_role_attachment" {
41+
role = aws_iam_role.upsert_idempotency_role.name
42+
policy_arn = aws_iam_policy.upsert_idempotency_policy.arn
43+
}

infrastructure/terraform/components/api/module_lambda_upsert_letter.tf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ module "upsert_letter" {
3535
log_subscription_role_arn = local.acct.log_subscription_role_arn
3636

3737
lambda_env_vars = merge(local.common_lambda_env_vars, {
38-
VARIANT_MAP = jsonencode(var.letter_variant_map)
38+
VARIANT_MAP = jsonencode(var.letter_variant_map),
39+
UPSERT_IDEMPOTENCY_TABLE_NAME = aws_dynamodb_table.upsert_idempotency.name
3940
})
4041
}
4142

lambdas/upsert-letter/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"dependencies": {
3+
"@aws-lambda-powertools/idempotency": "^2.33.0",
34
"@aws-sdk/client-dynamodb": "^3.984.0",
45
"@aws-sdk/lib-dynamodb": "^3.1008.0",
56
"@internal/datastore": "*",

lambdas/upsert-letter/src/config/__tests__/env.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ describe("lambdaEnv", () => {
1616

1717
it("should load all environment variables successfully", () => {
1818
process.env.LETTERS_TABLE_NAME = "letters-table";
19+
process.env.UPSERT_IDEMPOTENCY_TABLE_NAME = "idempotency-table";
1920
process.env.LETTER_TTL_HOURS = "12960";
2021

2122
const { envVars } = require("../env");
2223

2324
expect(envVars).toEqual({
2425
LETTERS_TABLE_NAME: "letters-table",
26+
UPSERT_IDEMPOTENCY_TABLE_NAME: "idempotency-table",
2527
LETTER_TTL_HOURS: 12_960,
2628
});
2729
});

lambdas/upsert-letter/src/config/deps.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
22
import { DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";
3+
import { DynamoDBPersistenceLayer } from "@aws-lambda-powertools/idempotency/dynamodb";
34
import { Logger } from "pino";
45
import { LetterRepository } from "@internal/datastore";
56
import { createLogger } from "@internal/helpers";
67
import { EnvVars, envVars } from "./env";
78

89
export type Deps = {
910
letterRepo: LetterRepository;
11+
idempotencyLayer: DynamoDBPersistenceLayer;
1012
logger: Logger;
1113
env: EnvVars;
1214
};
@@ -16,11 +18,7 @@ function createDocumentClient(): DynamoDBDocumentClient {
1618
return DynamoDBDocumentClient.from(ddbClient);
1719
}
1820

19-
function createLetterRepository(
20-
log: Logger,
21-
// eslint-disable-next-line @typescript-eslint/no-shadow
22-
envVars: EnvVars,
23-
): LetterRepository {
21+
function createLetterRepository(log: Logger): LetterRepository {
2422
const config = {
2523
lettersTableName: envVars.LETTERS_TABLE_NAME,
2624
lettersTtlHours: envVars.LETTER_TTL_HOURS,
@@ -29,11 +27,18 @@ function createLetterRepository(
2927
return new LetterRepository(createDocumentClient(), log, config);
3028
}
3129

30+
function createIdempotencyLayer(): DynamoDBPersistenceLayer {
31+
return new DynamoDBPersistenceLayer({
32+
tableName: envVars.UPSERT_IDEMPOTENCY_TABLE_NAME,
33+
});
34+
}
35+
3236
export function createDependenciesContainer(): Deps {
3337
const log = createLogger({ logLevel: envVars.PINO_LOG_LEVEL });
3438

3539
return {
36-
letterRepo: createLetterRepository(log, envVars),
40+
letterRepo: createLetterRepository(log),
41+
idempotencyLayer: createIdempotencyLayer(),
3742
logger: log,
3843
env: envVars,
3944
};

lambdas/upsert-letter/src/config/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const EnvVarsSchema = z.object({
44
LETTERS_TABLE_NAME: z.string(),
55
LETTER_TTL_HOURS: z.coerce.number().int(),
66
PINO_LOG_LEVEL: z.coerce.string().optional(),
7+
UPSERT_IDEMPOTENCY_TABLE_NAME: z.string(),
78
});
89

910
export type EnvVars = z.infer<typeof EnvVarsSchema>;

lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "@internal/datastore";
77
import { LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering";
88
import { LetterRequestPreparedEvent } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1";
9+
import { makeIdempotent } from "@aws-lambda-powertools/idempotency";
910
import {
1011
$LetterStatusChangeEvent,
1112
LetterStatusChangeEvent,
@@ -15,6 +16,14 @@ import { Deps } from "../../config/deps";
1516
import { EnvVars } from "../../config/env";
1617
import packageJson from "../../../package.json";
1718

19+
jest.mock("@aws-lambda-powertools/idempotency", () => {
20+
const original = jest.requireActual("@aws-lambda-powertools/idempotency");
21+
return {
22+
...original,
23+
makeIdempotent: jest.fn((fn, _) => fn),
24+
};
25+
});
26+
1827
const renderingSchemaVersion: string =
1928
packageJson.dependencies[
2029
"@nhsdigital/nhs-notify-event-schemas-letter-rendering"
@@ -313,7 +322,7 @@ describe("createUpsertLetterHandler", () => {
313322
);
314323
});
315324

316-
it("does not treat a replayed insert as a failure", async () => {
325+
it("does not treat a second insert for the same letter as a failure", async () => {
317326
const v1message = {
318327
letterEvent: createPreparedV1Event(),
319328
supplierSpec: {
@@ -338,6 +347,26 @@ describe("createUpsertLetterHandler", () => {
338347
expect(result!.batchItemFailures).toEqual([]);
339348
});
340349

350+
it("does not insert a letter if the same message is replayed", async () => {
351+
const v1message = {
352+
letterEvent: createPreparedV1Event(),
353+
supplierSpec: {
354+
supplierId: "supplier1",
355+
specId: "spec1",
356+
priority: 10,
357+
billingId: "billing1",
358+
},
359+
};
360+
const evt: SQSEvent = createSQSEvent([
361+
createSqsRecord("msg2", JSON.stringify(v1message)),
362+
]);
363+
(makeIdempotent as jest.Mock).mockImplementationOnce((_fn) => {});
364+
365+
await createUpsertLetterHandler(mockedDeps)(evt, {} as any, {} as any);
366+
367+
expect(mockedDeps.letterRepo.putLetter).not.toHaveBeenCalled();
368+
});
369+
341370
test("unknown supplier has metric emitted with 'unknown' supplier dimension", async () => {
342371
const letterEvent = createSupplierStatusChangeEventWithoutSupplier();
343372

lambdas/upsert-letter/src/handler/upsert-handler.ts

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ import {
1111
} from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events";
1212
import { $LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering";
1313
import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics";
14+
import {
15+
IdempotencyConfig,
16+
makeIdempotent,
17+
} from "@aws-lambda-powertools/idempotency";
18+
import { AnyFunction } from "@aws-lambda-powertools/idempotency/lib/cjs/types/IdempotencyOptions";
1419
import { Deps } from "../config/deps";
1520
import {
1621
PreparedEvents,
@@ -180,6 +185,7 @@ function parseQueueMessage(queueMessage: string): QueueMessage {
180185
}
181186

182187
export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
188+
const processRecordIdempotently = makeIdempotentOnId(processRecord, deps);
183189
return metricScope((metrics: MetricsLogger) => {
184190
return async (event: SQSEvent) => {
185191
const batchItemFailures: SQSBatchItemFailure[] = [];
@@ -216,28 +222,11 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
216222
supplier: supplierSpec,
217223
});
218224

219-
supplier =
220-
!supplierSpec || !supplierSpec.supplierId
221-
? getSupplierIdFromEvent(letterEvent)
222-
: supplierSpec.supplierId;
223-
224-
const operation = getOperationFromType(letterEvent.type);
225-
226-
await runUpsert(
227-
operation,
228-
letterEvent,
229-
supplierSpec ?? {
230-
supplierId: "unknown",
231-
specId: "unknown",
232-
priority: 10,
233-
billingId: "unknown",
234-
},
225+
supplier = await processRecordIdempotently(
235226
deps,
236-
);
237-
238-
perSupplierSuccess.set(
239-
supplier,
240-
(perSupplierSuccess.get(supplier) || 0) + 1,
227+
letterEvent,
228+
supplierSpec,
229+
perSupplierSuccess,
241230
);
242231
} catch (error) {
243232
deps.logger.error({
@@ -261,3 +250,42 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
261250
};
262251
});
263252
}
253+
254+
async function processRecord(
255+
deps: Deps,
256+
letterEvent: LetterStatusChangeEvent | PreparedEvents,
257+
supplierSpec: SupplierSpec | undefined,
258+
perSupplierSuccess: Map<string, number>,
259+
) {
260+
const supplier =
261+
!supplierSpec || !supplierSpec.supplierId
262+
? getSupplierIdFromEvent(letterEvent)
263+
: supplierSpec.supplierId;
264+
265+
const operation = getOperationFromType(letterEvent.type);
266+
267+
await runUpsert(
268+
operation,
269+
letterEvent,
270+
supplierSpec ?? {
271+
supplierId: "unknown",
272+
specId: "unknown",
273+
priority: 10,
274+
billingId: "unknown",
275+
},
276+
deps,
277+
);
278+
279+
perSupplierSuccess.set(supplier, (perSupplierSuccess.get(supplier) || 0) + 1);
280+
return supplier;
281+
}
282+
283+
function makeIdempotentOnId(fn: AnyFunction, deps: Deps) {
284+
return makeIdempotent(fn, {
285+
persistenceStore: deps.idempotencyLayer,
286+
dataIndexArgument: 1,
287+
config: new IdempotencyConfig({
288+
eventKeyJmesPath: "id",
289+
}),
290+
});
291+
}

package-lock.json

Lines changed: 58 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)