Skip to content

Commit b1cbbbe

Browse files
VED-105 delta lambda updated to include higher precision (#1233)
* delta lambda updated to include higher precision * refactored lambda and added tests * updated test to validate update on same time stamp * updated to coposite sort key in GSI and refactor * updated to coposite sort key in GSI and refactor * lint * lint * added logs to verify shape * added logs to verify shape * updated logs * updated tests and readme, removed additional logs * added lambda-powertools and its logger - updated tests * added basic typing for handler from powertools * added basic typing for handler from powertools * added basic typing for handler from powertools * updated append context key method for logger * replaced DateTimestampWithSequence with SequenceNumber in e2e test * update readme * update PR comments * update PR comments - added TODOs * update PR comments - added TODOs * debug for e2e tests * removed logs --------- Co-authored-by: FimranNHS <fatima.imran3@nhs.net>
1 parent ad2f47c commit b1cbbbe

14 files changed

Lines changed: 1259 additions & 306 deletions

File tree

infrastructure/instance/delta.tf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ resource "aws_lambda_function" "delta_sync_lambda" {
136136
AWS_SQS_QUEUE_URL = aws_sqs_queue.dlq.id
137137
SOURCE = "IEDS"
138138
SPLUNK_FIREHOSE_NAME = module.splunk.firehose_stream_name
139+
LOG_LEVEL = "INFO"
139140
}
140141
}
141142

@@ -172,7 +173,7 @@ resource "aws_cloudwatch_log_metric_filter" "delta_error_logs" {
172173
count = var.error_alarm_notifications_enabled ? 1 : 0
173174

174175
name = "${local.short_prefix}-DeltaErrorLogsFilter"
175-
pattern = "%\\[ERROR\\]%"
176+
pattern = "{ $.level = \"ERROR\" }"
176177
log_group_name = aws_cloudwatch_log_group.delta_lambda.name
177178

178179
metric_transformation {

infrastructure/instance/dynamodb.tf

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ resource "aws_dynamodb_table" "delta-dynamodb-table" {
100100
type = "S"
101101
}
102102

103+
attribute {
104+
name = "SequenceNumber"
105+
type = "S"
106+
}
107+
103108
ttl {
104109
attribute_name = "ExpiresAt"
105110
enabled = true
@@ -120,6 +125,26 @@ resource "aws_dynamodb_table" "delta-dynamodb-table" {
120125
}
121126
}
122127

128+
global_secondary_index {
129+
name = "OperationSequenceIndex"
130+
projection_type = "ALL"
131+
132+
key_schema {
133+
attribute_name = "Operation"
134+
key_type = "HASH"
135+
}
136+
137+
key_schema {
138+
attribute_name = "DateTimeStamp"
139+
key_type = "RANGE"
140+
}
141+
142+
key_schema {
143+
attribute_name = "SequenceNumber"
144+
key_type = "RANGE"
145+
}
146+
}
147+
123148
global_secondary_index {
124149
name = "SecondarySearchIndex"
125150
projection_type = "ALL"

lambdas/delta_backend/README.md

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,99 @@ Note: Paths are relative to this directory, `delta_backend`.
4040
4141
4. Run `make test` to run unit tests or `make coverage-run`. To see the unit test coverage, run `make coverage-run` first and then `make coverage-report`.
4242
43+
## Delta Stream Input Contract
44+
45+
This lambda consumes DynamoDB Stream records from `imms-<env>-imms-events` (`NEW_IMAGE`).
46+
47+
### Expected shape
48+
49+
```json
50+
{
51+
"Records": [
52+
{
53+
"eventID": "bcd6c0cb0ba0ad872b6340ce4f500340",
54+
"eventName": "INSERT",
55+
"eventVersion": "1.1",
56+
"eventSource": "aws:dynamodb",
57+
"awsRegion": "eu-west-2",
58+
"dynamodb": {
59+
"ApproximateCreationDateTime": 1772437744,
60+
"Keys": {
61+
"PK": { "S": "Immunization#<uuid>" }
62+
},
63+
"NewImage": {
64+
"Version": { "N": "1" },
65+
"PK": { "S": "Immunization#<uuid>" },
66+
"PatientPK": { "S": "Patient#<nhs-number>" },
67+
"PatientSK": { "S": "<vaccine-type>#<uuid>" },
68+
"IdentifierPK": {
69+
"S": "<identifier-system>#<identifier-value>"
70+
},
71+
"Operation": { "S": "CREATE | UPDATE | DELETE | REMOVE" },
72+
"SupplierSystem": { "S": "EMIS | RAVS | ..." },
73+
"Resource": {
74+
"S": "{\"resourceType\":\"Immunization\", ... }"
75+
}
76+
},
77+
"SequenceNumber": "30993300003322088696887818",
78+
"SizeBytes": 4324,
79+
"StreamViewType": "NEW_IMAGE"
80+
},
81+
"eventSourceARN": "arn:aws:dynamodb:eu-west-2:<account-id>:table/imms-<env>-imms-events/stream/<timestamp>"
82+
}
83+
]
84+
}
85+
```
86+
87+
### Compatibility rules
88+
89+
The processor accepts the following legacy/fallback inputs:
90+
91+
1. **PK key format**
92+
- Current: `Immunization#<uuid>` (e.g. `Immunization#42c55e00-03b5-46fd-bbef-678199f5777c`)
93+
- `ImmsID` is extracted as the segment after the first `#`
94+
95+
2. **PatientSK / VaccineType format**
96+
- Current: `<vaccine-type>#<uuid>` (e.g. `RSV#42c55e00-03b5-46fd-bbef-678199f5777c`)
97+
- `VaccineType` is extracted as the segment before `#`, lowercased and stripped
98+
99+
3. **Operation fallback**
100+
- Preferred: `NewImage.Operation`
101+
- Fallback: mapped from `eventName`:
102+
- `INSERT → CREATE`
103+
- `MODIFY → UPDATE`
104+
- `REMOVE → DELETE_PHYSICAL`
105+
- If `Operation` is absent and `eventName` is not `REMOVE`, record is routed to DLQ
106+
107+
4. **Payload fallback**
108+
- Preferred: `NewImage.Resource` (FHIR JSON string, converted via `Converter`)
109+
- Fallback: `NewImage.Imms` (already-flat JSON string), with `ACTION_FLAG` added when missing
110+
- `REMOVE` events have no payload — `Imms` is written as `""` to delta table
111+
112+
5. **Sequence fallback**
113+
- Preferred: `dynamodb.SequenceNumber`
114+
- Final fallback: `"0"`
115+
116+
6. **Skip rules**
117+
- If `SupplierSystem` is `DPSFULL` or `DPSREDUCED`, record is skipped (logged, no DDB write)
118+
119+
### REMOVE event shape
120+
121+
`REMOVE` stream events have no `NewImage`. `PK` is only available in `Keys`:
122+
123+
```json
124+
{
125+
"eventName": "REMOVE",
126+
"dynamodb": {
127+
"Keys": {
128+
"PK": { "S": "Immunization#<uuid>" }
129+
},
130+
"SequenceNumber": "...",
131+
"StreamViewType": "NEW_IMAGE"
132+
}
133+
}
134+
```
135+
43136
## 🛠️ Key Features
44137

45138
- Schema-driven field extraction and formatting
@@ -92,3 +185,24 @@ You can now:
92185

93186
- Open `output.csv` in Excel or Google Sheets to view cleanly structured records
94187
- Inspect `output.json` to validate the flat key-value output programmatically
188+
189+
## Observability
190+
191+
### Logging
192+
193+
The Delta Lambda uses [AWS Lambda Powertools for Python](https://docs.powertools.aws.dev/lambda/python/latest/) `3.24.0` for structured JSON logging.
194+
195+
| Setting | Value |
196+
| ----------------- | --------------------------------------------------------------------------- |
197+
| `service` | `delta` |
198+
| Default log level | `INFO` |
199+
| Log format | Structured JSON (CloudWatch Insights compatible) |
200+
| Location logging | Off by default — set `POWERTOOLS_LOGGER_LOG_CALLABLE_LOCATION=true` locally |
201+
202+
#### Changing log level at runtime
203+
204+
Set the Lambda environment variable:
205+
206+
```
207+
LOG_LEVEL=DEBUG
208+
```

lambdas/delta_backend/poetry.lock

Lines changed: 33 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lambdas/delta_backend/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ mypy-boto3-dynamodb = "^1.42.33"
1616
moto = "~5.1.20"
1717
python-stdnum = "^2.1"
1818
coverage = "^7.13.2"
19+
aws-lambda-powertools = {version = "3.24.0"}
1920

2021
[tool.poetry.group.dev.dependencies]
2122
coverage = "^7.13.2"

0 commit comments

Comments
 (0)