@@ -11,6 +11,11 @@ import {
1111} from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events" ;
1212import { $LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering" ;
1313import { MetricsLogger , Unit , metricScope } from "aws-embedded-metrics" ;
14+ import {
15+ IdempotencyConfig ,
16+ makeIdempotent ,
17+ } from "@aws-lambda-powertools/idempotency" ;
18+ import { AnyFunction } from "@aws-lambda-powertools/idempotency/lib/cjs/types/IdempotencyOptions" ;
1419import { Deps } from "../config/deps" ;
1520import {
1621 PreparedEvents ,
@@ -180,6 +185,7 @@ function parseQueueMessage(queueMessage: string): QueueMessage {
180185}
181186
182187export default function createUpsertLetterHandler ( deps : Deps ) : SQSHandler {
188+ const processRecordIdempotently = makeIdempotentOnId ( processRecord , deps ) ;
183189 return metricScope ( ( metrics : MetricsLogger ) => {
184190 return async ( event : SQSEvent ) => {
185191 const batchItemFailures : SQSBatchItemFailure [ ] = [ ] ;
@@ -216,28 +222,11 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
216222 supplier : supplierSpec ,
217223 } ) ;
218224
219- supplier =
220- ! supplierSpec || ! supplierSpec . supplierId
221- ? getSupplierIdFromEvent ( letterEvent )
222- : supplierSpec . supplierId ;
223-
224- const operation = getOperationFromType ( letterEvent . type ) ;
225-
226- await runUpsert (
227- operation ,
228- letterEvent ,
229- supplierSpec ?? {
230- supplierId : "unknown" ,
231- specId : "unknown" ,
232- priority : 10 ,
233- billingId : "unknown" ,
234- } ,
225+ supplier = await processRecordIdempotently (
235226 deps ,
236- ) ;
237-
238- perSupplierSuccess . set (
239- supplier ,
240- ( perSupplierSuccess . get ( supplier ) || 0 ) + 1 ,
227+ letterEvent ,
228+ supplierSpec ,
229+ perSupplierSuccess ,
241230 ) ;
242231 } catch ( error ) {
243232 deps . logger . error ( {
@@ -261,3 +250,42 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler {
261250 } ;
262251 } ) ;
263252}
253+
254+ async function processRecord (
255+ deps : Deps ,
256+ letterEvent : LetterStatusChangeEvent | PreparedEvents ,
257+ supplierSpec : SupplierSpec | undefined ,
258+ perSupplierSuccess : Map < string , number > ,
259+ ) {
260+ const supplier =
261+ ! supplierSpec || ! supplierSpec . supplierId
262+ ? getSupplierIdFromEvent ( letterEvent )
263+ : supplierSpec . supplierId ;
264+
265+ const operation = getOperationFromType ( letterEvent . type ) ;
266+
267+ await runUpsert (
268+ operation ,
269+ letterEvent ,
270+ supplierSpec ?? {
271+ supplierId : "unknown" ,
272+ specId : "unknown" ,
273+ priority : 10 ,
274+ billingId : "unknown" ,
275+ } ,
276+ deps ,
277+ ) ;
278+
279+ perSupplierSuccess . set ( supplier , ( perSupplierSuccess . get ( supplier ) || 0 ) + 1 ) ;
280+ return supplier ;
281+ }
282+
283+ function makeIdempotentOnId ( fn : AnyFunction , deps : Deps ) {
284+ return makeIdempotent ( fn , {
285+ persistenceStore : deps . idempotencyLayer ,
286+ dataIndexArgument : 1 ,
287+ config : new IdempotencyConfig ( {
288+ eventKeyJmesPath : "id" ,
289+ } ) ,
290+ } ) ;
291+ }
0 commit comments