@@ -24,43 +24,43 @@ export default function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
2424 count : streamEvent . Records ?. length || 0 ,
2525 } ) ;
2626
27- const ddbRecords : DynamoDBRecord [ ] = streamEvent . Records . map ( ( record ) =>
28- extractPayload ( record , deps ) ,
29- ) ;
30-
31- const newPendingLetters = ddbRecords
32- . filter ( ( record ) => filterRecord ( record , deps ) )
33- . map ( ( element ) => extractNewLetter ( element ) )
34- . map ( ( element ) => mapLetterToPendingLetter ( element ) ) ;
35-
36- for ( const pendingLetter of newPendingLetters ) {
37- try {
38- deps . logger . info ( {
39- description : "Persisting pending letter" ,
40- pendingLetter,
41- } ) ;
42- await deps . letterQueueRepository . putLetter ( pendingLetter ) ;
43- successCount += 1 ;
44- } catch ( error ) {
45- if ( error instanceof LetterAlreadyExistsError ) {
46- deps . logger . warn ( {
47- description : "Letter already exists" ,
48- supplierId : pendingLetter . supplierId ,
49- letterId : pendingLetter . letterId ,
50- } ) ;
51- } else {
52- deps . logger . error ( {
53- description : "Error persisting pending letter" ,
54- error,
27+ for ( const record of streamEvent . Records ) {
28+ const ddbRecord = extractPayload ( record , deps ) ;
29+
30+ if ( isNewPendingLetter ( ddbRecord ) ) {
31+ const letter = extractNewLetter ( ddbRecord ) ;
32+ const pendingLetter = mapLetterToPendingLetter ( letter ) ;
33+
34+ try {
35+ deps . logger . info ( {
36+ description : "Persisting pending letter" ,
5537 pendingLetter,
5638 } ) ;
57- recordProcessing ( deps , successCount , 1 ) ;
58- // If we get a failure, return immediately without processing the remaining records. Since we are
59- // working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
60- // See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
61- return {
62- batchItemFailures : [ { itemIdentifier : pendingLetter . letterId } ] ,
63- } ;
39+ await deps . letterQueueRepository . putLetter ( pendingLetter ) ;
40+ successCount += 1 ;
41+ } catch ( error ) {
42+ if ( error instanceof LetterAlreadyExistsError ) {
43+ deps . logger . warn ( {
44+ description : "Letter already exists" ,
45+ supplierId : pendingLetter . supplierId ,
46+ letterId : pendingLetter . letterId ,
47+ } ) ;
48+ } else {
49+ deps . logger . error ( {
50+ description : "Error persisting pending letter" ,
51+ error,
52+ pendingLetter,
53+ } ) ;
54+ recordProcessing ( deps , successCount , 1 ) ;
55+ // If we get a failure, return immediately without processing the remaining records. Since we are
56+ // working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
57+ // See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
58+ return {
59+ batchItemFailures : [
60+ { itemIdentifier : record . kinesis . sequenceNumber } ,
61+ ] ,
62+ } ;
63+ }
6464 }
6565 }
6666 }
@@ -86,22 +86,12 @@ function recordProcessing(
8686 deps . logger . info ( buildMetric ( "letters queued failed" , failureCount ) ) ;
8787}
8888
89- function filterRecord ( record : DynamoDBRecord , deps : Deps ) : boolean {
89+ function isNewPendingLetter ( record : DynamoDBRecord ) : boolean {
9090 const isInsert = record . eventName === "INSERT" ;
9191 const newImage = record . dynamodb ?. NewImage ;
9292 const isPending = newImage ?. status ?. S === "PENDING" ;
9393
94- const allowEvent = isInsert && isPending ;
95-
96- deps . logger . info ( {
97- description : "Filtering record" ,
98- eventName : record . eventName ,
99- eventId : record . eventID ,
100- status : newImage ?. status ?. S ,
101- allowEvent,
102- } ) ;
103-
104- return allowEvent ;
94+ return isInsert && isPending ;
10595}
10696
10797function extractPayload (
0 commit comments