-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathamendment-event-transformer.ts
More file actions
116 lines (109 loc) · 3.46 KB
/
amendment-event-transformer.ts
File metadata and controls
116 lines (109 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda";
import { PublishCommand } from "@aws-sdk/client-sns";
import { LetterEvent } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events";
import { mapLetterToCloudEvent } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-mapper";
import { Unit } from "aws-embedded-metrics";
import pino from "pino";
import { MetricEntry, MetricStatus, buildEMFObject } from "@internal/helpers";
import {
UpdateLetterCommand,
UpdateLetterCommandSchema,
} from "../contracts/letters";
import { Deps } from "../config/deps";
export default function createTransformAmendmentEventHandler(
deps: Deps,
): SQSHandler {
return async (event: SQSEvent) => {
const batchItemFailures: SQSBatchItemFailure[] = [];
const tasks = event.Records.map(async (message) => {
try {
const updateLetterCommand: UpdateLetterCommand =
UpdateLetterCommandSchema.parse(JSON.parse(message.body));
const letter = await deps.letterRepo.getLetterById(
updateLetterCommand.supplierId,
updateLetterCommand.id,
);
letter.status = updateLetterCommand.status;
letter.reasonCode = updateLetterCommand.reasonCode;
letter.reasonText = updateLetterCommand.reasonText;
const letterEvent = mapLetterToCloudEvent(
letter,
deps.env.EVENT_SOURCE,
);
await deps.snsClient.send(
buildSnsCommand(letterEvent, deps.env.SNS_TOPIC_ARN),
);
deps.logger.info({
description: "Sent letter status update via topic",
letterId: updateLetterCommand.id,
messageId: message.messageId,
correlationId: message.messageAttributes.CorrelationId.stringValue,
});
emitSuccessMetrics(
updateLetterCommand.supplierId,
updateLetterCommand.status,
deps.logger,
);
} catch (error) {
deps.logger.error({
description: "Error processing letter status update",
err: error,
messageId: message.messageId,
correlationId: message.messageAttributes.CorrelationId.stringValue,
messageBody: message.body,
});
batchItemFailures.push({ itemIdentifier: message.messageId });
}
});
await Promise.all(tasks);
emitFailedItems(batchItemFailures, deps.logger);
return { batchItemFailures };
};
}
function buildSnsCommand(
letterEvent: LetterEvent,
topicArn: string,
): PublishCommand {
return new PublishCommand({
TopicArn: topicArn,
Message: JSON.stringify(letterEvent),
});
}
function emitSuccessMetrics(
supplierId: string,
status: string,
logger: pino.Logger,
) {
const dimensions: Record<string, string> = {
supplier: supplierId,
status,
};
const metric: MetricEntry = {
key: MetricStatus.Success,
value: 1,
unit: Unit.Count,
};
const emf = buildEMFObject("amendment-event-transformer", dimensions, metric);
logger.info(emf);
}
function emitFailedItems(
batchFailures: SQSBatchItemFailure[],
logger: pino.Logger,
) {
for (const item of batchFailures) {
const dimensions: Record<string, string> = {
identifier: item.itemIdentifier,
};
const metric: MetricEntry = {
key: MetricStatus.Failure,
value: 1,
unit: Unit.Count,
};
const emf = buildEMFObject(
"amendment-event-transformer",
dimensions,
metric,
);
logger.info(emf);
}
}