Skip to content

Commit 6620888

Browse files
committed
CCM-13519 Add idempotency layer in front of upsert
1 parent b95b17f commit 6620888

13 files changed

Lines changed: 195 additions & 45 deletions

File tree

infrastructure/terraform/components/api/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ No requirements.
5858
| <a name="module_ddb_alarms_letters"></a> [ddb\_alarms\_letters](#module\_ddb\_alarms\_letters) | ../../modules/alarms-ddb | n/a |
5959
| <a name="module_ddb_alarms_mi"></a> [ddb\_alarms\_mi](#module\_ddb\_alarms\_mi) | ../../modules/alarms-ddb | n/a |
6060
| <a name="module_ddb_alarms_suppliers"></a> [ddb\_alarms\_suppliers](#module\_ddb\_alarms\_suppliers) | ../../modules/alarms-ddb | n/a |
61+
| <a name="module_ddb_alarms_upsert_idempotency"></a> [ddb\_alarms\_upsert\_idempotency](#module\_ddb\_alarms\_upsert\_idempotency) | ../../modules/alarms-ddb | n/a |
6162
| <a name="module_domain_truststore"></a> [domain\_truststore](#module\_domain\_truststore) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/3.0.6/terraform-s3bucket.zip | n/a |
6263
| <a name="module_eventpub"></a> [eventpub](#module\_eventpub) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/3.0.6/terraform-eventpub.zip | n/a |
6364
| <a name="module_eventsub"></a> [eventsub](#module\_eventsub) | ../../modules/eventsub | n/a |
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
resource "aws_dynamodb_table" "upsert_idempotency" {
2+
name = "${local.csi}-upsert-idempotency"
3+
billing_mode = "PAY_PER_REQUEST"
4+
hash_key = "id"
5+
attribute {
6+
name = "id"
7+
type = "S"
8+
}
9+
ttl {
10+
attribute_name = "expiration"
11+
enabled = true
12+
}
13+
14+
point_in_time_recovery {
15+
enabled = true
16+
}
17+
18+
tags = merge(
19+
local.default_tags,
20+
{
21+
NHSE-Enable-Dynamo-Backup-Acct = "True"
22+
}
23+
)
24+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
module "ddb_alarms_upsert_idempotency" {
2+
count = local.alarms_enabled ? 1 : 0
3+
source = "../../modules/alarms-ddb"
4+
alarm_prefix = local.csi
5+
table_name = aws_dynamodb_table.upsert_idempotency.name
6+
tags = local.default_tags
7+
}

infrastructure/terraform/components/api/module_lambda_upsert_letter.tf

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ module "upsert_letter" {
3535
log_subscription_role_arn = local.acct.log_subscription_role_arn
3636

3737
lambda_env_vars = merge(local.common_lambda_env_vars, {
38-
VARIANT_MAP = jsonencode(var.letter_variant_map)
38+
VARIANT_MAP = jsonencode(var.letter_variant_map),
39+
UPSERT_IDEMPOTENCY_TABLE_NAME = aws_dynamodb_table.upsert_idempotency.name
3940
})
4041
}
4142

@@ -67,6 +68,7 @@ data "aws_iam_policy_document" "upsert_letter_lambda" {
6768

6869
resources = [
6970
aws_dynamodb_table.letters.arn,
71+
aws_dynamodb_table.upsert_idempotency.arn,
7072
"${aws_dynamodb_table.letters.arn}/index/supplierStatus-index"
7173
]
7274
}

lambdas/upsert-letter/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"dependencies": {
3+
"@aws-lambda-powertools/idempotency": "^2.33.0",
34
"@aws-sdk/client-dynamodb": "^3.984.0",
45
"@aws-sdk/lib-dynamodb": "^3.1008.0",
56
"@internal/datastore": "*",

lambdas/upsert-letter/src/config/__tests__/env.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ describe("lambdaEnv", () => {
1616

1717
it("should load all environment variables successfully", () => {
1818
process.env.LETTERS_TABLE_NAME = "letters-table";
19+
process.env.UPSERT_IDEMPOTENCY_TABLE_NAME = "idempotency-table";
1920
process.env.LETTER_TTL_HOURS = "12960";
2021

2122
const { envVars } = require("../env");
2223

2324
expect(envVars).toEqual({
2425
LETTERS_TABLE_NAME: "letters-table",
26+
UPSERT_IDEMPOTENCY_TABLE_NAME: "idempotency-table",
2527
LETTER_TTL_HOURS: 12_960,
2628
});
2729
});

lambdas/upsert-letter/src/config/deps.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
22
import { DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";
3+
import { DynamoDBPersistenceLayer } from "@aws-lambda-powertools/idempotency/dynamodb";
34
import { Logger } from "pino";
45
import { LetterRepository } from "@internal/datastore";
56
import { createLogger } from "@internal/helpers";
67
import { EnvVars, envVars } from "./env";
78

89
export type Deps = {
910
letterRepo: LetterRepository;
11+
idempotencyLayer: DynamoDBPersistenceLayer;
1012
logger: Logger;
1113
env: EnvVars;
1214
};
@@ -16,11 +18,7 @@ function createDocumentClient(): DynamoDBDocumentClient {
1618
return DynamoDBDocumentClient.from(ddbClient);
1719
}
1820

19-
function createLetterRepository(
20-
log: Logger,
21-
// eslint-disable-next-line @typescript-eslint/no-shadow
22-
envVars: EnvVars,
23-
): LetterRepository {
21+
function createLetterRepository(log: Logger): LetterRepository {
2422
const config = {
2523
lettersTableName: envVars.LETTERS_TABLE_NAME,
2624
lettersTtlHours: envVars.LETTER_TTL_HOURS,
@@ -29,11 +27,18 @@ function createLetterRepository(
2927
return new LetterRepository(createDocumentClient(), log, config);
3028
}
3129

30+
function createIdempotencyLayer(): DynamoDBPersistenceLayer {
31+
return new DynamoDBPersistenceLayer({
32+
tableName: envVars.UPSERT_IDEMPOTENCY_TABLE_NAME,
33+
});
34+
}
35+
3236
export function createDependenciesContainer(): Deps {
3337
const log = createLogger({ logLevel: envVars.PINO_LOG_LEVEL });
3438

3539
return {
36-
letterRepo: createLetterRepository(log, envVars),
40+
letterRepo: createLetterRepository(log),
41+
idempotencyLayer: createIdempotencyLayer(),
3742
logger: log,
3843
env: envVars,
3944
};

lambdas/upsert-letter/src/config/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const EnvVarsSchema = z.object({
44
LETTERS_TABLE_NAME: z.string(),
55
LETTER_TTL_HOURS: z.coerce.number().int(),
66
PINO_LOG_LEVEL: z.coerce.string().optional(),
7+
UPSERT_IDEMPOTENCY_TABLE_NAME: z.string(),
78
});
89

910
export type EnvVars = z.infer<typeof EnvVarsSchema>;

lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "@internal/datastore";
77
import { LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering";
88
import { LetterRequestPreparedEvent } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1";
9+
import { makeIdempotent } from "@aws-lambda-powertools/idempotency";
910
import {
1011
$LetterStatusChangeEvent,
1112
LetterStatusChangeEvent,
@@ -15,6 +16,14 @@ import { Deps } from "../../config/deps";
1516
import { EnvVars } from "../../config/env";
1617
import packageJson from "../../../package.json";
1718

19+
jest.mock("@aws-lambda-powertools/idempotency", () => {
20+
const original = jest.requireActual("@aws-lambda-powertools/idempotency");
21+
return {
22+
...original,
23+
makeIdempotent: jest.fn((fn, _) => fn),
24+
};
25+
});
26+
1827
const renderingSchemaVersion: string =
1928
packageJson.dependencies[
2029
"@nhsdigital/nhs-notify-event-schemas-letter-rendering"
@@ -313,7 +322,7 @@ describe("createUpsertLetterHandler", () => {
313322
);
314323
});
315324

316-
it("does not treat a replayed insert as a failure", async () => {
325+
it("does not treat a second insert for the same letter as a failure", async () => {
317326
const v1message = {
318327
letterEvent: createPreparedV1Event(),
319328
supplierSpec: {
@@ -338,6 +347,26 @@ describe("createUpsertLetterHandler", () => {
338347
expect(result!.batchItemFailures).toEqual([]);
339348
});
340349

350+
it("does not insert a letter if the same message is replayed", async () => {
351+
const v1message = {
352+
letterEvent: createPreparedV1Event(),
353+
supplierSpec: {
354+
supplierId: "supplier1",
355+
specId: "spec1",
356+
priority: 10,
357+
billingId: "billing1",
358+
},
359+
};
360+
const evt: SQSEvent = createSQSEvent([
361+
createSqsRecord("msg2", JSON.stringify(v1message)),
362+
]);
363+
(makeIdempotent as jest.Mock).mockImplementationOnce((_fn) => "supplier1");
364+
365+
await createUpsertLetterHandler(mockedDeps)(evt, {} as any, {} as any);
366+
367+
expect(mockedDeps.letterRepo.putLetter).not.toHaveBeenCalled();
368+
});
369+
341370
test("unknown supplier has metric emitted with 'unknown' supplier dimension", async () => {
342371
const letterEvent = createSupplierStatusChangeEventWithoutSupplier();
343372

lambdas/upsert-letter/src/handler/upsert-handler.ts

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda";
1+
import { Context, SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda";
22
import { $LetterRequestPreparedEvent } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1";
33
import {
44
InsertLetter,
@@ -11,6 +11,10 @@ import {
1111
} from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events";
1212
import { $LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering";
1313
import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics";
14+
import {
15+
IdempotencyConfig,
16+
makeIdempotent,
17+
} from "@aws-lambda-powertools/idempotency";
1418
import { Deps } from "../config/deps";
1519
import {
1620
PreparedEvents,
@@ -20,6 +24,10 @@ import {
2024
UpsertOperation,
2125
} from "./schemas";
2226

27+
const idempotencyConfig = new IdempotencyConfig({
28+
eventKeyJmesPath: "id",
29+
});
30+
2331
function getOperationFromType(type: string): UpsertOperation {
2432
if (
2533
type.startsWith("uk.nhs.notify.letter-rendering.letter-request.prepared")
@@ -180,8 +188,13 @@ function parseQueueMessage(queueMessage: string): QueueMessage {
180188
}
181189

182190
export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
191+
const processRecordIdempotently = makeIdempotent(processRecord, {
192+
persistenceStore: deps.idempotencyLayer,
193+
config: idempotencyConfig,
194+
});
195+
183196
return metricScope((metrics: MetricsLogger) => {
184-
return async (event: SQSEvent) => {
197+
return async (event: SQSEvent, context: Context) => {
185198
const batchItemFailures: SQSBatchItemFailure[] = [];
186199
const perSupplierSuccess: Map<string, number> = new Map<string, number>();
187200
const perSupplierFailure: Map<string, number> = new Map<string, number>();
@@ -216,29 +229,13 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
216229
supplier: supplierSpec,
217230
});
218231

219-
supplier =
220-
!supplierSpec || !supplierSpec.supplierId
221-
? getSupplierIdFromEvent(letterEvent)
222-
: supplierSpec.supplierId;
223-
224-
const operation = getOperationFromType(letterEvent.type);
225-
226-
await runUpsert(
227-
operation,
232+
idempotencyConfig.registerLambdaContext(context);
233+
supplier = await processRecordIdempotently(
228234
letterEvent,
229-
supplierSpec ?? {
230-
supplierId: "unknown",
231-
specId: "unknown",
232-
priority: 10,
233-
billingId: "unknown",
234-
},
235+
supplierSpec,
236+
perSupplierSuccess,
235237
deps,
236238
);
237-
238-
perSupplierSuccess.set(
239-
supplier,
240-
(perSupplierSuccess.get(supplier) || 0) + 1,
241-
);
242239
} catch (error) {
243240
deps.logger.error({
244241
description: "Error processing upsert of record",
@@ -261,3 +258,32 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
261258
};
262259
});
263260
}
261+
262+
async function processRecord(
263+
letterEvent: LetterStatusChangeEvent | PreparedEvents,
264+
supplierSpec: SupplierSpec | undefined,
265+
perSupplierSuccess: Map<string, number>,
266+
deps: Deps,
267+
) {
268+
const supplier =
269+
!supplierSpec || !supplierSpec.supplierId
270+
? getSupplierIdFromEvent(letterEvent)
271+
: supplierSpec.supplierId;
272+
273+
const operation = getOperationFromType(letterEvent.type);
274+
275+
await runUpsert(
276+
operation,
277+
letterEvent,
278+
supplierSpec ?? {
279+
supplierId: "unknown",
280+
specId: "unknown",
281+
priority: 10,
282+
billingId: "unknown",
283+
},
284+
deps,
285+
);
286+
287+
perSupplierSuccess.set(supplier, (perSupplierSuccess.get(supplier) || 0) + 1);
288+
return supplier;
289+
}

0 commit comments

Comments
 (0)