Skip to content

Commit dc897c5

Browse files
masl2m-houston
andauthored
CCM-13615: Status Update Transformer Source (#309)
* source include acct and environment * Update lambdas/letter-updates-transformer/src/letter-updates-transformer.ts Co-authored-by: Mike Houston <60653100+m-houston@users.noreply.github.com> * use const * trivy node setup * allow insert record * different logging approach for extract * extra records count log * improve coverage * event schema bump for sourcing attributes from original event * lint pass * event mapper version * lock * regex version check in test --------- Co-authored-by: Mike Houston <60653100+m-houston@users.noreply.github.com>
1 parent b585191 commit dc897c5

13 files changed

Lines changed: 168 additions & 46 deletions

File tree

docs/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"debug": "JEKYLL_ENV=development BUNDLE_GEMFILE=Gemfile bundle exec jekyll serve --config _config.yml,_config.dev.yml,_config.version.yml --limit_posts 100 --trace",
1616
"generate-includes": "./generate-includes.sh",
1717
"lint": "echo \"Documentation module has no code to lint\"",
18+
"lint:fix": "echo \"Documentation module has no code to lint\"",
1819
"test:unit": "echo \"Documentation module has no unit tests\"",
1920
"typecheck": "echo \"Documentation module has no typescript to typecheck\""
2021
},

infrastructure/terraform/components/api/module_lambda_letter_updates_transformer.tf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ module "letter_updates_transformer" {
3535
log_subscription_role_arn = local.acct.log_subscription_role_arn
3636

3737
lambda_env_vars = merge(local.common_lambda_env_vars, {
38-
EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}"
38+
EVENTPUB_SNS_TOPIC_ARN = "${module.eventpub.sns_topic.arn}",
39+
EVENT_SOURCE = "/data-plane/supplier-api/${var.group}/${var.environment}/letters"
3940
})
4041
}
4142

internal/events/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,5 @@
5050
"typecheck": "tsc --noEmit"
5151
},
5252
"types": "dist/index.d.ts",
53-
"version": "1.0.8"
53+
"version": "1.0.9"
5454
}

internal/events/schemas/examples/letter.ACCEPTED.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"recordedtime": "2025-08-28T08:45:00.000Z",
1919
"severitynumber": 2,
2020
"severitytext": "INFO",
21-
"source": "/data-plane/supplier-api/prod/update-status",
21+
"source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status",
2222
"specversion": "1.0",
2323
"subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479",
2424
"time": "2025-08-28T08:45:00.000Z",

internal/events/schemas/examples/letter.FORWARDED.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"recordedtime": "2025-08-28T08:45:00.000Z",
2121
"severitynumber": 2,
2222
"severitytext": "INFO",
23-
"source": "/data-plane/supplier-api/prod/update-status",
23+
"source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status",
2424
"specversion": "1.0",
2525
"subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479",
2626
"time": "2025-08-28T08:45:00.000Z",

internal/events/schemas/examples/letter.RETURNED.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"recordedtime": "2025-08-28T08:45:00.000Z",
2121
"severitynumber": 2,
2222
"severitytext": "INFO",
23-
"source": "/data-plane/supplier-api/prod/update-status",
23+
"source": "/data-plane/supplier-api/nhs-supplier-api-prod/main/update-status",
2424
"specversion": "1.0",
2525
"subject": "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479",
2626
"time": "2025-08-28T08:45:00.000Z",

lambdas/letter-updates-transformer/src/__tests__/letter-updates-transformer.test.ts

Lines changed: 89 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@ jest.mock("crypto", () => ({
2626
randomBytes: (size: number) => randomBytes[String(size)],
2727
}));
2828

29-
describe("letter-updates-transformer Lambda", () => {
30-
const mockedDeps: jest.Mocked<Deps> = {
31-
snsClient: { send: jest.fn() } as unknown as SNSClient,
32-
logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger,
33-
env: {
34-
EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic",
35-
} as unknown as EnvVars,
36-
} as Deps;
29+
const eventSource =
30+
"/data-plane/supplier-api/nhs-supplier-api-dev/main/letters";
31+
const mockedDeps: jest.Mocked<Deps> = {
32+
snsClient: { send: jest.fn() } as unknown as SNSClient,
33+
logger: { info: jest.fn(), error: jest.fn() } as unknown as pino.Logger,
34+
env: {
35+
EVENTPUB_SNS_TOPIC_ARN: "arn:aws:sns:region:account:topic",
36+
EVENT_SOURCE: eventSource,
37+
} as unknown as EnvVars,
38+
} as Deps;
3739

40+
describe("letter-updates-transformer Lambda", () => {
3841
beforeEach(() => {
3942
jest.useFakeTimers();
4043
});
@@ -50,7 +53,9 @@ describe("letter-updates-transformer Lambda", () => {
5053
const newLetter = generateLetter("PRINTED");
5154
const expectedEntries = [
5255
expect.objectContaining({
53-
Message: JSON.stringify(mapLetterToCloudEvent(newLetter)),
56+
Message: JSON.stringify(
57+
mapLetterToCloudEvent(newLetter, eventSource),
58+
),
5459
}),
5560
];
5661

@@ -76,7 +81,9 @@ describe("letter-updates-transformer Lambda", () => {
7681
newLetter.reasonCode = "R1";
7782
const expectedEntries = [
7883
expect.objectContaining({
79-
Message: JSON.stringify(mapLetterToCloudEvent(newLetter)),
84+
Message: JSON.stringify(
85+
mapLetterToCloudEvent(newLetter, eventSource),
86+
),
8087
}),
8188
];
8289

@@ -103,7 +110,9 @@ describe("letter-updates-transformer Lambda", () => {
103110
newLetter.reasonCode = "R2";
104111
const expectedEntries = [
105112
expect.objectContaining({
106-
Message: JSON.stringify(mapLetterToCloudEvent(newLetter)),
113+
Message: JSON.stringify(
114+
mapLetterToCloudEvent(newLetter, eventSource),
115+
),
107116
}),
108117
];
109118

@@ -135,14 +144,28 @@ describe("letter-updates-transformer Lambda", () => {
135144
expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
136145
});
137146

138-
it("does not publish non-modify events", async () => {
147+
it("publishes INSERT events", async () => {
139148
const handler = createHandler(mockedDeps);
140149
const newLetter = generateLetter("ACCEPTED");
150+
const expectedEntries = [
151+
expect.objectContaining({
152+
Message: JSON.stringify(
153+
mapLetterToCloudEvent(newLetter, eventSource),
154+
),
155+
}),
156+
];
141157

142158
const testData = generateKinesisEvent([generateInsertRecord(newLetter)]);
143159
await handler(testData, mockDeep<Context>(), jest.fn());
144160

145-
expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
161+
expect(mockedDeps.snsClient.send).toHaveBeenCalledWith(
162+
expect.objectContaining({
163+
input: expect.objectContaining({
164+
TopicArn: "arn:aws:sns:region:account:topic",
165+
PublishBatchRequestEntries: expectedEntries,
166+
}),
167+
}),
168+
);
146169
});
147170

148171
it("does not publish invalid letter data", async () => {
@@ -159,6 +182,55 @@ describe("letter-updates-transformer Lambda", () => {
159182

160183
expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
161184
});
185+
186+
it("throws error when kinesis data contains malformed JSON", async () => {
187+
const handler = createHandler(mockedDeps);
188+
189+
// Create a Kinesis event with malformed JSON data
190+
const malformedKinesisEvent: KinesisStreamEvent = {
191+
Records: [
192+
{
193+
kinesis: {
194+
data: Buffer.from("invalid-json-data").toString("base64"),
195+
sequenceNumber: "12345",
196+
},
197+
} as any,
198+
],
199+
};
200+
201+
await expect(
202+
handler(malformedKinesisEvent, mockDeep<Context>(), jest.fn()),
203+
).rejects.toThrow();
204+
205+
expect(mockedDeps.logger.error).toHaveBeenCalledWith(
206+
expect.objectContaining({
207+
description: "Error extracting payload",
208+
error: expect.any(Error),
209+
record: expect.objectContaining({
210+
kinesis: expect.objectContaining({
211+
data: Buffer.from("invalid-json-data").toString("base64"),
212+
}),
213+
}),
214+
}),
215+
);
216+
});
217+
218+
it("handles events with no records", async () => {
219+
const handler = createHandler(mockedDeps);
220+
221+
// Create a Kinesis event with empty Records array
222+
const emptyKinesisEvent: KinesisStreamEvent = { Records: [] };
223+
224+
await handler(emptyKinesisEvent, mockDeep<Context>(), jest.fn());
225+
226+
expect(mockedDeps.logger.info).toHaveBeenCalledWith(
227+
expect.objectContaining({
228+
description: "Number of records",
229+
count: 0,
230+
}),
231+
);
232+
expect(mockedDeps.snsClient.send).not.toHaveBeenCalled();
233+
});
162234
});
163235

164236
describe("Batching", () => {
@@ -168,7 +240,7 @@ describe("letter-updates-transformer Lambda", () => {
168240
const newLetters = generateLetters(10, "PRINTED");
169241
const expectedEntries = newLetters.map((letter) =>
170242
expect.objectContaining({
171-
Message: JSON.stringify(mapLetterToCloudEvent(letter)),
243+
Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
172244
}),
173245
);
174246

@@ -197,19 +269,19 @@ describe("letter-updates-transformer Lambda", () => {
197269
newLetters.slice(0, 10).map((letter, index) =>
198270
expect.objectContaining({
199271
Id: expect.stringMatching(new RegExp(`-${index}$`)),
200-
Message: JSON.stringify(mapLetterToCloudEvent(letter)),
272+
Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
201273
}),
202274
),
203275
newLetters.slice(10, 20).map((letter, index) =>
204276
expect.objectContaining({
205277
Id: expect.stringMatching(new RegExp(`-${index}$`)),
206-
Message: JSON.stringify(mapLetterToCloudEvent(letter)),
278+
Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
207279
}),
208280
),
209281
newLetters.slice(20).map((letter, index) =>
210282
expect.objectContaining({
211283
Id: expect.stringMatching(new RegExp(`-${index}$`)),
212-
Message: JSON.stringify(mapLetterToCloudEvent(letter)),
284+
Message: JSON.stringify(mapLetterToCloudEvent(letter, eventSource)),
213285
}),
214286
),
215287
];

lambdas/letter-updates-transformer/src/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { z } from "zod";
22

33
const EnvVarsSchema = z.object({
44
EVENTPUB_SNS_TOPIC_ARN: z.string(),
5+
EVENT_SOURCE: z.string(),
56
});
67

78
export type EnvVars = z.infer<typeof EnvVarsSchema>;

lambdas/letter-updates-transformer/src/letter-updates-transformer.ts

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@ const BATCH_SIZE = 10;
2020
export default function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
2121
return async (streamEvent: KinesisStreamEvent) => {
2222
deps.logger.info({ description: "Received event", streamEvent });
23+
deps.logger.info({
24+
description: "Number of records",
25+
count: streamEvent.Records?.length || 0,
26+
});
2327

24-
const cloudEvents: LetterEvent[] = streamEvent.Records.map((record) =>
28+
// Ensure logging by extracting all records first
29+
const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) =>
2530
extractPayload(record, deps),
26-
)
27-
.filter((record) => record.eventName === "MODIFY")
28-
.filter(
29-
(record) =>
30-
isChanged(record, "status") || isChanged(record, "reasonCode"),
31-
)
31+
);
32+
33+
const cloudEvents: LetterEvent[] = ddbRecords
34+
.filter((record) => filterRecord(record, deps))
3235
.map((element) => extractNewLetter(element))
33-
.map((element) => mapLetterToCloudEvent(element));
36+
.map((element) => mapLetterToCloudEvent(element, deps.env.EVENT_SOURCE));
3437

3538
for (const batch of generateBatches(cloudEvents)) {
3639
deps.logger.info({
@@ -50,14 +53,54 @@ export default function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
5053
};
5154
}
5255

56+
function filterRecord(record: DynamoDBRecord, deps: Deps): boolean {
57+
let allowEvent = false;
58+
if (record.eventName === "INSERT") {
59+
allowEvent = true;
60+
}
61+
62+
if (
63+
record.eventName === "MODIFY" &&
64+
(isChanged(record, "status") || isChanged(record, "reasonCode"))
65+
) {
66+
allowEvent = true;
67+
}
68+
69+
deps.logger.info({
70+
description: "Filtering record",
71+
eventName: record.eventName,
72+
eventId: record.eventID,
73+
allowEvent,
74+
});
75+
76+
return allowEvent;
77+
}
78+
5379
function extractPayload(
5480
record: KinesisStreamRecord,
5581
deps: Deps,
5682
): DynamoDBRecord {
57-
// Kinesis data is base64 encoded
58-
const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8");
59-
deps.logger.info({ description: "Extracted dynamoDBRecord", payload });
60-
return JSON.parse(payload);
83+
try {
84+
deps.logger.info({
85+
description: "Processing Kinesis record",
86+
recordId: record.kinesis.sequenceNumber,
87+
});
88+
89+
// Kinesis data is base64 encoded
90+
const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8");
91+
deps.logger.info({ description: "Decoded payload", payload });
92+
93+
const jsonParsed = JSON.parse(payload);
94+
deps.logger.info({ description: "Extracted dynamoDBRecord", jsonParsed });
95+
return jsonParsed;
96+
} catch (error) {
97+
deps.logger.error({
98+
description: "Error extracting payload",
99+
error,
100+
record,
101+
});
102+
throw error;
103+
}
61104
}
62105

63106
function isChanged(record: DynamoDBRecord, property: string): boolean {

lambdas/letter-updates-transformer/src/mappers/__tests__/letter-mapper.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ describe("letter-mapper", () => {
1717
source: "letter-rendering/source/test",
1818
subject: "letter-rendering/source/letter/letter-id",
1919
} as Letter;
20-
const event = mapLetterToCloudEvent(letter);
20+
const source = "/data-plane/supplier-api/nhs-supplier-api-dev/main/letters";
21+
const event = mapLetterToCloudEvent(letter, source);
2122

2223
// Check it conforms to the letter event schema - parse will throw an error if not
2324
$LetterEvent.parse(event);
2425
expect(event.type).toBe("uk.nhs.notify.supplier-api.letter.PRINTED.v1");
2526
expect(event.dataschema).toBe(
2627
`https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.PRINTED.${event.dataschemaversion}.schema.json`,
2728
);
28-
expect(event.dataschemaversion).toBe("1.0.8");
29+
expect(event.dataschemaversion).toMatch(/1\.\d+\.\d+/);
2930
expect(event.subject).toBe("letter-origin/supplier-api/letter/id1");
3031
expect(event.time).toBe("2025-11-24T15:55:18.000Z");
3132
expect(event.recordedtime).toBe("2025-11-24T15:55:18.000Z");
@@ -45,5 +46,6 @@ describe("letter-mapper", () => {
4546
event: event.id,
4647
},
4748
});
49+
expect(event.source).toBe(source);
4850
});
4951
});

0 commit comments

Comments
 (0)