Skip to content

Commit 210ca88

Browse files
committed
Put SQS queue between SNS topic and lambda
1 parent f20c282 commit 210ca88

6 files changed

Lines changed: 199 additions & 105 deletions

File tree

infrastructure/terraform/components/api/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ No requirements.
5858
| <a name="module_post_mi"></a> [post\_mi](#module\_post\_mi) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.29/terraform-lambda.zip | n/a |
5959
| <a name="module_s3bucket_test_letters"></a> [s3bucket\_test\_letters](#module\_s3bucket\_test\_letters) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-s3bucket.zip | n/a |
6060
| <a name="module_supplier_events_forwarder_lambda"></a> [supplier\_events\_forwarder\_lambda](#module\_supplier\_events\_forwarder\_lambda) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-lambda.zip | n/a |
61+
| <a name="module_supplier_events_queue"></a> [supplier\_events\_queue](#module\_supplier\_events\_queue) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-sqs.zip | n/a |
6162
| <a name="module_supplier_requests_queue"></a> [supplier\_requests\_queue](#module\_supplier\_requests\_queue) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.24/terraform-sqs.zip | n/a |
6263
| <a name="module_supplier_ssl"></a> [supplier\_ssl](#module\_supplier\_ssl) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-ssl.zip | n/a |
6364
| <a name="module_upsert_letter"></a> [upsert\_letter](#module\_upsert\_letter) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.29/terraform-lambda.zip | n/a |

infrastructure/terraform/components/api/module_lambda_supplier_events_forwarder.tf

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,19 @@ data "aws_iam_policy_document" "supplier_events_forwarder_lambda" {
6868
module.eventsub.firehose_delivery_stream.arn,
6969
]
7070
}
71+
72+
statement {
73+
sid = "SQSPermissions"
74+
effect = "Allow"
75+
76+
actions = [
77+
"sqs:ReceiveMessage",
78+
"sqs:DeleteMessage",
79+
"sqs:GetQueueAttributes",
80+
]
81+
82+
resources = [
83+
module.supplier_events_queue.sqs_queue_arn,
84+
]
85+
}
7186
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
module "supplier_events_queue" {
2+
source = "https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-sqs.zip"
3+
4+
aws_account_id = var.aws_account_id
5+
component = var.component
6+
environment = var.environment
7+
project = var.project
8+
region = var.region
9+
name = "${local.csi}-supplier-events-queue"
10+
11+
fifo_queue = true
12+
content_based_deduplication = true
13+
14+
sqs_kms_key_arn = module.kms.key_arn
15+
16+
visibility_timeout_seconds = 60
17+
18+
create_dlq = true
19+
sqs_policy_overload = data.aws_iam_policy_document.supplier_events_queue_policy.json
20+
}
21+
22+
data "aws_iam_policy_document" "supplier_events_queue_policy" {
23+
version = "2012-10-17"
24+
statement {
25+
sid = "AllowSNSToSendMessage"
26+
effect = "Allow"
27+
28+
principals {
29+
type = "Service"
30+
identifiers = ["sns.amazonaws.com"]
31+
}
32+
33+
actions = [
34+
"sqs:SendMessage"
35+
]
36+
37+
resources = [
38+
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-supplier-events-queue.fifo"
39+
]
40+
41+
condition {
42+
test = "ArnEquals"
43+
variable = "aws:SourceArn"
44+
values = [module.eventsub.sns_topic_supplier.arn]
45+
}
46+
}
47+
48+
statement {
49+
sid = "AllowSNSPermissions"
50+
effect = "Allow"
51+
52+
principals {
53+
type = "Service"
54+
identifiers = ["sns.amazonaws.com"]
55+
}
56+
57+
actions = [
58+
"sqs:SendMessage",
59+
"sqs:ListQueueTags",
60+
"sqs:GetQueueUrl",
61+
"sqs:GetQueueAttributes",
62+
]
63+
64+
resources = [
65+
"arn:aws:sqs:${var.region}:${var.aws_account_id}:${local.csi}-supplier-events-queue.fifo"
66+
]
67+
68+
condition {
69+
test = "ArnEquals"
70+
variable = "aws:SourceArn"
71+
values = [module.eventsub.sns_topic_supplier.arn]
72+
}
73+
}
74+
}
Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
1-
resource "aws_sns_topic_subscription" "supplier_events_forwarder_lambda" {
2-
topic_arn = module.eventsub.sns_topic_supplier.arn
3-
protocol = "lambda"
4-
endpoint = module.supplier_events_forwarder_lambda.function_arn
1+
resource "aws_sns_topic_subscription" "supplier_events_queue" {
2+
topic_arn = module.eventsub.sns_topic_supplier.arn
3+
protocol = "sqs"
4+
endpoint = module.supplier_events_queue.sqs_queue_arn
5+
raw_message_delivery = false
56
}
67

7-
resource "aws_lambda_permission" "supplier_events_forwarder_lambda_sns" {
8-
statement_id = "AllowExecutionFromSNS"
9-
action = "lambda:InvokeFunction"
10-
function_name = module.supplier_events_forwarder_lambda.function_name
11-
principal = "sns.amazonaws.com"
12-
source_arn = module.eventsub.sns_topic_supplier.arn
8+
resource "aws_lambda_event_source_mapping" "supplier_events_forwarder" {
9+
event_source_arn = module.supplier_events_queue.sqs_queue_arn
10+
function_name = module.supplier_events_forwarder_lambda.function_arn
11+
batch_size = 10
12+
maximum_batching_window_in_seconds = 1
13+
scaling_config { maximum_concurrency = 10 }
14+
15+
depends_on = [
16+
module.supplier_events_queue,
17+
module.supplier_events_forwarder_lambda
18+
]
1319
}

lambdas/supplier-events-forwarder/src/__tests__/forwarder.test.ts

Lines changed: 84 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,69 @@
1-
import { Context, SNSEvent, SNSEventRecord } from "aws-lambda";
1+
import { Context, SQSEvent, SQSRecord } from "aws-lambda";
22
import { FirehoseClient, PutRecordCommand } from "@aws-sdk/client-firehose";
33
import { mockDeep } from "jest-mock-extended";
44
import pino from "pino";
55
import createForwarder from "../forwarder";
66
import { Deps } from "../deps";
77

8-
function createSNSEvent(records: SNSEventRecord[]): SNSEvent {
8+
function createSQSEvent(records: SQSRecord[]): SQSEvent {
99
return {
1010
Records: records,
1111
};
1212
}
1313

14-
function createSNSEventRecord(
15-
message: string,
16-
overrides: Partial<SNSEventRecord["Sns"]> = {},
17-
): SNSEventRecord {
14+
/**
15+
* Creates an SQS record with a body containing the SNS notification wrapper.
16+
* This simulates what SNS delivers to SQS when raw_message_delivery is false.
17+
*/
18+
function createSQSRecord(
19+
body: string,
20+
messageId = "test-sqs-msg-id",
21+
): SQSRecord {
1822
return {
19-
EventVersion: "1.0",
20-
EventSubscriptionArn:
21-
"arn:aws:sns:eu-west-2:123456789012:topic:subscription",
22-
EventSource: "aws:sns",
23-
Sns: {
24-
SignatureVersion: "1",
25-
Timestamp: "2026-01-09T12:00:00.000Z",
26-
Signature: "test-signature",
27-
SigningCertUrl: "https://sns.eu-west-2.amazonaws.com/cert.pem",
28-
MessageId: "test-message-id",
29-
Message: message,
30-
MessageAttributes: {},
31-
Type: "Notification",
32-
UnsubscribeUrl: "https://sns.eu-west-2.amazonaws.com/unsubscribe",
33-
TopicArn: "arn:aws:sns:eu-west-2:123456789012:test-topic",
34-
Subject: "Test Subject",
35-
...overrides,
23+
messageId,
24+
receiptHandle: "test-receipt-handle",
25+
body,
26+
attributes: {
27+
ApproximateReceiveCount: "1",
28+
SentTimestamp: "1704801600000",
29+
SenderId: "123456789012",
30+
ApproximateFirstReceiveTimestamp: "1704801600000",
3631
},
32+
messageAttributes: {},
33+
md5OfBody: "test-md5",
34+
eventSource: "aws:sqs",
35+
eventSourceARN: "arn:aws:sqs:eu-west-2:123456789012:test-queue.fifo",
36+
awsRegion: "eu-west-2",
3737
};
3838
}
3939

40-
function buildExpectedSnsWrapper(record: SNSEventRecord): object {
41-
return {
40+
/**
41+
* Creates an SNS notification wrapper as it would appear in the SQS message body
42+
* when raw_message_delivery is false.
43+
*/
44+
function createSnsNotificationWrapper(
45+
message: string,
46+
overrides: Partial<{
47+
MessageId: string;
48+
TopicArn: string;
49+
Subject: string;
50+
}> = {},
51+
): string {
52+
return JSON.stringify({
4253
Type: "Notification",
43-
MessageId: record.Sns.MessageId,
44-
TopicArn: record.Sns.TopicArn,
45-
Subject: record.Sns.Subject,
46-
Message: record.Sns.Message,
47-
Timestamp: record.Sns.Timestamp,
48-
SignatureVersion: record.Sns.SignatureVersion,
49-
Signature: record.Sns.Signature,
50-
SigningCertUrl: record.Sns.SigningCertUrl,
51-
UnsubscribeUrl: record.Sns.UnsubscribeUrl,
52-
MessageAttributes: record.Sns.MessageAttributes,
53-
};
54+
MessageId: overrides.MessageId ?? "test-sns-message-id",
55+
TopicArn:
56+
overrides.TopicArn ??
57+
"arn:aws:sns:eu-west-2:123456789012:test-topic.fifo",
58+
Subject: overrides.Subject ?? "Test Subject",
59+
Message: message,
60+
Timestamp: "2026-01-09T12:00:00.000Z",
61+
SignatureVersion: "1",
62+
Signature: "test-signature",
63+
SigningCertUrl: "https://sns.eu-west-2.amazonaws.com/cert.pem",
64+
UnsubscribeUrl: "https://sns.eu-west-2.amazonaws.com/unsubscribe",
65+
MessageAttributes: {},
66+
});
5467
}
5568

5669
describe("forwarder", () => {
@@ -74,13 +87,14 @@ describe("forwarder", () => {
7487
});
7588

7689
describe("createForwarder", () => {
77-
it("should process a single SNS record and send to Firehose", async () => {
90+
it("should process a single SQS record and send to Firehose", async () => {
7891
const message = JSON.stringify({ eventType: "test", data: "value" });
79-
const snsRecord = createSNSEventRecord(message);
80-
const snsEvent = createSNSEvent([snsRecord]);
92+
const snsWrapper = createSnsNotificationWrapper(message);
93+
const sqsRecord = createSQSRecord(snsWrapper);
94+
const sqsEvent = createSQSEvent([sqsRecord]);
8195

8296
const handler = createForwarder(mockDeps);
83-
await handler(snsEvent, mockDeep<Context>(), jest.fn());
97+
await handler(sqsEvent, mockDeep<Context>(), jest.fn());
8498

8599
expect(mockFirehoseClient.send).toHaveBeenCalledTimes(1);
86100
expect(mockFirehoseClient.send).toHaveBeenCalledWith(
@@ -89,89 +103,87 @@ describe("forwarder", () => {
89103

90104
const sentCommand = mockFirehoseClient.send.mock
91105
.calls[0][0] as PutRecordCommand;
92-
const expectedWrapper = buildExpectedSnsWrapper(snsRecord);
93106
expect(sentCommand.input).toEqual({
94107
DeliveryStreamName: mockDeliveryStreamName,
95108
Record: {
96-
Data: Buffer.from(`${JSON.stringify(expectedWrapper)}\n`),
109+
Data: Buffer.from(`${snsWrapper}\n`),
97110
},
98111
});
99112
});
100113

101-
it("should process multiple SNS records and send to Firehose", async () => {
114+
it("should process multiple SQS records and send to Firehose", async () => {
102115
const message1 = JSON.stringify({ eventType: "test1" });
103116
const message2 = JSON.stringify({ eventType: "test2" });
104117
const message3 = JSON.stringify({ eventType: "test3" });
105118

106-
const snsRecord1 = createSNSEventRecord(message1, {
119+
const snsWrapper1 = createSnsNotificationWrapper(message1, {
107120
MessageId: "msg-1",
108121
});
109-
const snsRecord2 = createSNSEventRecord(message2, {
122+
const snsWrapper2 = createSnsNotificationWrapper(message2, {
110123
MessageId: "msg-2",
111124
});
112-
const snsRecord3 = createSNSEventRecord(message3, {
125+
const snsWrapper3 = createSnsNotificationWrapper(message3, {
113126
MessageId: "msg-3",
114127
});
115128

116-
const snsEvent = createSNSEvent([snsRecord1, snsRecord2, snsRecord3]);
129+
const sqsEvent = createSQSEvent([
130+
createSQSRecord(snsWrapper1, "sqs-1"),
131+
createSQSRecord(snsWrapper2, "sqs-2"),
132+
createSQSRecord(snsWrapper3, "sqs-3"),
133+
]);
117134

118135
const handler = createForwarder(mockDeps);
119-
await handler(snsEvent, mockDeep<Context>(), jest.fn());
136+
await handler(sqsEvent, mockDeep<Context>(), jest.fn());
120137

121138
expect(mockFirehoseClient.send).toHaveBeenCalledTimes(3);
122139

123140
const sentCommands = mockFirehoseClient.send.mock.calls.map(
124-
(call: [PutRecordCommand]) => call[0],
141+
(call) => call[0] as PutRecordCommand,
125142
);
126143

127144
expect(sentCommands[0].input).toEqual({
128145
DeliveryStreamName: mockDeliveryStreamName,
129146
Record: {
130-
Data: Buffer.from(
131-
`${JSON.stringify(buildExpectedSnsWrapper(snsRecord1))}\n`,
132-
),
147+
Data: Buffer.from(`${snsWrapper1}\n`),
133148
},
134149
});
135150

136151
expect(sentCommands[1].input).toEqual({
137152
DeliveryStreamName: mockDeliveryStreamName,
138153
Record: {
139-
Data: Buffer.from(
140-
`${JSON.stringify(buildExpectedSnsWrapper(snsRecord2))}\n`,
141-
),
154+
Data: Buffer.from(`${snsWrapper2}\n`),
142155
},
143156
});
144157

145158
expect(sentCommands[2].input).toEqual({
146159
DeliveryStreamName: mockDeliveryStreamName,
147160
Record: {
148-
Data: Buffer.from(
149-
`${JSON.stringify(buildExpectedSnsWrapper(snsRecord3))}\n`,
150-
),
161+
Data: Buffer.from(`${snsWrapper3}\n`),
151162
},
152163
});
153164
});
154165

155-
it("should handle empty SNS event with no records", async () => {
156-
const snsEvent = createSNSEvent([]);
166+
it("should handle empty SQS event with no records", async () => {
167+
const sqsEvent = createSQSEvent([]);
157168

158169
const handler = createForwarder(mockDeps);
159-
await handler(snsEvent, mockDeep<Context>(), jest.fn());
170+
await handler(sqsEvent, mockDeep<Context>(), jest.fn());
160171

161172
expect(mockFirehoseClient.send).not.toHaveBeenCalled();
162173
});
163174

164-
it("should wrap message in SNS notification format", async () => {
175+
it("should forward the SNS notification wrapper from SQS body to Firehose", async () => {
165176
const message = JSON.stringify({ key: "value" });
166-
const snsRecord = createSNSEventRecord(message, {
177+
const snsWrapper = createSnsNotificationWrapper(message, {
167178
MessageId: "unique-msg-id",
168-
TopicArn: "arn:aws:sns:eu-west-2:123456789012:my-topic",
179+
TopicArn: "arn:aws:sns:eu-west-2:123456789012:my-topic.fifo",
169180
Subject: "My Subject",
170181
});
171-
const snsEvent = createSNSEvent([snsRecord]);
182+
const sqsRecord = createSQSRecord(snsWrapper);
183+
const sqsEvent = createSQSEvent([sqsRecord]);
172184

173185
const handler = createForwarder(mockDeps);
174-
await handler(snsEvent, mockDeep<Context>(), jest.fn());
186+
await handler(sqsEvent, mockDeep<Context>(), jest.fn());
175187

176188
const sentCommand = mockFirehoseClient.send.mock
177189
.calls[0][0] as PutRecordCommand;
@@ -181,7 +193,7 @@ describe("forwarder", () => {
181193
expect(parsedData).toEqual({
182194
Type: "Notification",
183195
MessageId: "unique-msg-id",
184-
TopicArn: "arn:aws:sns:eu-west-2:123456789012:my-topic",
196+
TopicArn: "arn:aws:sns:eu-west-2:123456789012:my-topic.fifo",
185197
Subject: "My Subject",
186198
Message: message,
187199
Timestamp: "2026-01-09T12:00:00.000Z",
@@ -193,13 +205,14 @@ describe("forwarder", () => {
193205
});
194206
});
195207

196-
it("should append newline to wrapped message for JSON Lines format", async () => {
208+
it("should append newline to message for JSON Lines format", async () => {
197209
const message = JSON.stringify({ key: "value" });
198-
const snsRecord = createSNSEventRecord(message);
199-
const snsEvent = createSNSEvent([snsRecord]);
210+
const snsWrapper = createSnsNotificationWrapper(message);
211+
const sqsRecord = createSQSRecord(snsWrapper);
212+
const sqsEvent = createSQSEvent([sqsRecord]);
200213

201214
const handler = createForwarder(mockDeps);
202-
await handler(snsEvent, mockDeep<Context>(), jest.fn());
215+
await handler(sqsEvent, mockDeep<Context>(), jest.fn());
203216

204217
const sentCommand = mockFirehoseClient.send.mock
205218
.calls[0][0] as PutRecordCommand;

0 commit comments

Comments
 (0)