diff --git a/adobe-autotag-container/adobe_autotag_processor.py b/adobe-autotag-container/adobe_autotag_processor.py index 0afd7185..2c0fafb4 100644 --- a/adobe-autotag-container/adobe_autotag_processor.py +++ b/adobe-autotag-container/adobe_autotag_processor.py @@ -91,6 +91,59 @@ s3 = boto3.client('s3') +def _chunk_page_range(chunk_key): + """Return (page_start, page_end) for a chunk key like temp//_chunk_N.pdf. + + Uses the splitter's fixed 200-pages-per-chunk constant to compute the range. + Returns (None, None) if the chunk number cannot be parsed. + """ + import re + PAGES_PER_CHUNK = 200 + if not chunk_key: + return None, None + m = re.search(r'_chunk_(\d+)\.pdf$', chunk_key) + if not m: + return None, None + n = int(m.group(1)) + page_start = (n - 1) * PAGES_PER_CHUNK + 1 + page_end = n * PAGES_PER_CHUNK + return page_start, page_end + + +def report_failure(bucket_name, file_base_name, chunk_key, reason_category, message): + """Write a structured failure-detail file the Step Functions failure-handler + aggregates into the user-facing result/FAILED_.json marker. + + Station: 'adobe' (Adobe AutoTag/Extract). Best-effort and exception-proof: + reporting a failure must never throw a second failure that masks the original. + """ + page_start, page_end = _chunk_page_range(chunk_key) + logger.error( + f"File: {file_base_name}, Status: FAILED | station=adobe | " + f"reason={reason_category} | page={page_start}-{page_end} | {message}" + ) + + if not bucket_name or not file_base_name: + return + detail = { + "station": "adobe", + "reason_category": reason_category, + "message": str(message)[:2000], + } + if page_start is not None: + detail["page_start"] = page_start + detail["page_end"] = page_end + try: + s3.put_object( + Bucket=bucket_name, + Key=f"temp/{file_base_name}/_errors/adobe_failure.json", + Body=json.dumps(detail).encode("utf-8"), + ContentType="application/json", + ) + except Exception as e: + logger.error(f"Filename : {file_base_name} | Could not write failure detail: {e}") + + def download_file_from_s3(bucket_name,file_base_name, file_key, local_path): """ Download a file from an S3 bucket. @@ -430,7 +483,8 @@ def create_sqlite_db(by_page, filename, images_output_dir, object_ids, image_pat prev TEXT, current TEXT, next TEXT, - context TEXT + context TEXT, + page_num INTEGER ) """) @@ -524,12 +578,13 @@ def create_sqlite_db(by_page, filename, images_output_dir, object_ids, image_pat print(" ======================") # Insert the data into the SQLite database. cursor.execute(""" - INSERT INTO image_data (objid, img_path, context) - VALUES (?, ?, ?) + INSERT INTO image_data (objid, img_path, context, page_num) + VALUES (?, ?, ?, ?) """, ( current_candidate["objid"], current_candidate["filePaths"][0].split("/")[-1], - context + context, + pg_num + 1 )) print("Added in the database: ", current_candidate["objid"], current_candidate["filePaths"][0].split("/")[-1]) @@ -620,7 +675,8 @@ def extract_images_from_excel(filename, figure_path, autotag_report_path, images prev TEXT, current TEXT, next TEXT, - context TEXT + context TEXT, + page_num INTEGER ) """) @@ -640,11 +696,12 @@ def main(): """ file_key = None file_base_name = None - - try: - bucket_name = os.getenv('S3_BUCKET_NAME') + s3_file_key = None + bucket_name = os.getenv('S3_BUCKET_NAME') + + try: s3_file_key = os.getenv('S3_FILE_KEY') - + if not bucket_name or not s3_file_key: logging.error("Error: S3_BUCKET_NAME and S3_FILE_KEY environment variables are required.") sys.exit(1) @@ -714,21 +771,36 @@ def main(): logging.info(f'Filename : {file_key} | Processing completed successfully') logger.info(f"File: {file_base_name}, Status: Succeeded in First ECS task") - except (ServiceApiException, ServiceUsageException, SdkException) as e: + except ServiceUsageException as e: + # Quota / credits exhausted — not a document problem, retrying later may work. + logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - Adobe API Quota") + logger.error(f"Filename : {file_key} | Adobe API Quota Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", + f"Adobe API quota or credits exhausted. Please try again later. Adobe error: {e}") + sys.exit(1) + except (ServiceApiException, SdkException) as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - Adobe API Error") logger.error(f"Filename : {file_key} | Adobe API Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", + f"Adobe API failed for this document. This document may be too complex for our Adobe API to handle. Adobe error: {e}") sys.exit(1) except ClientError as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - AWS Error") logger.error(f"Filename : {file_key} | AWS Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "INFRA", + f"AWS infrastructure error: {e}") sys.exit(1) except FileNotFoundError as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - File Not Found") logger.error(f"Filename : {file_key} | File Not Found Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API", + "Adobe API failed for this document. This document may be too complex for our Adobe API to handle.") sys.exit(1) except Exception as e: logger.error(f"File: {file_base_name}, Status: Failed in First ECS task") logger.error(f"Filename : {file_key} | Unexpected Error: {e}") + report_failure(bucket_name, file_base_name, s3_file_key, "UNKNOWN", + f"Unexpected error processing this document: {e}") sys.exit(1) if __name__ == "__main__": diff --git a/alt-text-generator-container/alt_text_generator.js b/alt-text-generator-container/alt_text_generator.js index 05e39a98..2212a716 100644 --- a/alt-text-generator-container/alt_text_generator.js +++ b/alt-text-generator-container/alt_text_generator.js @@ -69,6 +69,73 @@ function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } +const MAX_ASPECT_RATIO = 20; + +function getImageDimensions(buffer) { + try { + if (buffer[0] === 0x89 && buffer[1] === 0x50) { + // PNG: width at offset 16, height at offset 20 (4 bytes each, big-endian) + const width = buffer.readUInt32BE(16); + const height = buffer.readUInt32BE(20); + return { width, height }; + } + if (buffer[0] === 0xFF && buffer[1] === 0xD8) { + // JPEG: scan for SOF0 marker (0xFFC0) + let offset = 2; + while (offset < buffer.length - 9) { + if (buffer[offset] === 0xFF) { + const marker = buffer[offset + 1]; + if (marker >= 0xC0 && marker <= 0xCF && marker !== 0xC4 && marker !== 0xC8 && marker !== 0xCC) { + const height = buffer.readUInt16BE(offset + 5); + const width = buffer.readUInt16BE(offset + 7); + return { width, height }; + } + const segLen = buffer.readUInt16BE(offset + 2); + offset += 2 + segLen; + } else { + offset++; + } + } + } + } catch (e) { + return null; + } + return null; +} + +function isAspectRatioValid(dimensions) { + if (!dimensions || !dimensions.width || !dimensions.height) return true; + const { width, height } = dimensions; + const ratio = Math.max(width / height, height / width); + return ratio <= MAX_ASPECT_RATIO; +} + +/** + * Writes a structured failure-detail file that the Step Functions failure-handler + * aggregates into the user-facing result/FAILED_.json marker. + */ +async function reportFailure(bucketName, fileBaseName, reasonCategory, message) { + logger.error(`File: ${fileBaseName}, Status: FAILED | station=alttext | reason=${reasonCategory} | ${message}`); + + if (!bucketName || !fileBaseName) return; + const detail = { + station: "alttext", + reason_category: reasonCategory, + message: String(message).slice(0, 2000), + }; + try { + await s3Client.send(new PutObjectCommand({ + Bucket: bucketName, + Key: `temp/${fileBaseName}/_errors/alttext_failure.json`, + Body: JSON.stringify(detail), + ContentType: "application/json", + })); + } catch (e) { + logger.error(`Filename: ${fileBaseName} | Could not write failure detail: ${e.message || e}`); + } +} + + /** * Invokes the Bedrock AI model to generate alt text for a given image. @@ -471,6 +538,7 @@ async function startProcess() { context_json: { context: row.context, }, + page_num: row.page_num || null, }; }); } catch (err) { @@ -492,6 +560,7 @@ async function startProcess() { logger.info(`Filename: ${filebasename} | imageObjects: ${imageObjects}`); logger.info(`Filename: ${filebasename} | Total images to process: ${imageObjects.length}`); + let skippedCount = 0; for (const imageObject of imageObjects) { try { const getObjectParams = { @@ -502,7 +571,7 @@ async function startProcess() { logger.info(`Filename: ${filebasename} | Image Object Bucketname: ${bucketName}`); const command = new GetObjectCommand(getObjectParams); const { Body } = await s3Client.send(command); - + // Stream the body contents to a buffer const chunks = []; await pipeline(Body, async function* (source) { @@ -511,6 +580,18 @@ async function startProcess() { } }); const fileBuffer = Buffer.concat(chunks); + + // Aspect-ratio pre-check: skip images with extreme ratios + const dimensions = getImageDimensions(fileBuffer); + if (dimensions && !isAspectRatioValid(dimensions)) { + const ratio = Math.max(dimensions.width / dimensions.height, dimensions.height / dimensions.width).toFixed(1); + const pageDesc = imageObject.page_num ? ` | page=${imageObject.page_num}` : ""; + logger.info(`File: ${filebasename}, Status: INFO | station=alttext | image=${imageObject.id}${pageDesc} | action=decorative | reason=complex_image_aspect_ratio_${ratio}:1`); + skippedCount++; + combinedResults[imageObject.id] = "Decorative element"; + continue; + } + const localFilePath = path.join(__dirname, `${imageObject.path.split('/').pop()}`); logger.info(`Filename: ${filebasename} | Local File Path: ${localFilePath}`); fs_1.writeFileSync(localFilePath, fileBuffer); @@ -522,26 +603,33 @@ async function startProcess() { logger.info(`Filename: ${filebasename} | Alt text generation succeeded for image ${imageObject.id} (${successCount} succeeded, ${failureCount} failed)`); } catch (error) { failureCount++; - logger.error(`Filename: ${filebasename} | Alt text generation failed for image ${imageObject.id}: ${error.message || error}`); + const pageDesc = imageObject.page_num ? ` | page=${imageObject.page_num}` : ""; + logger.error(`File: ${filebasename}, Status: FAILED | station=alttext | image=${imageObject.id}${pageDesc} | reason=BEDROCK_API | ${error.message || error}`); logger.info(`Filename: ${filebasename} | Progress: ${successCount} succeeded, ${failureCount} failed`); } await sleep(2000); } - // Check if we have any images and if all of them failed - if (imageObjects.length > 0 && successCount === 0) { + const processedImages = imageObjects.length - skippedCount; + if (processedImages > 0 && successCount === 0) { logger.error(`Filename: ${filebasename} | All ${failureCount} alt text generation requests failed - likely due to throttling or Bedrock API issues`); logger.error(`File: ${filebasename}, Status: Failed in second ECS task - All Bedrock requests failed`); + const failedPages = [...new Set( + imageObjects + .filter(img => img.page_num != null && !combinedResults.hasOwnProperty(img.id)) + .map(img => img.page_num) + )].sort((a, b) => a - b); + const pageInfo = failedPages.length > 0 ? ` Failed on pages: ${failedPages.join(', ')}.` : ''; + await reportFailure(bucketName, filebasename, "BEDROCK_API", + `All ${failureCount} Bedrock alt-text requests failed (throttling or Bedrock API issues).${pageInfo}`); process.exit(1); } - - logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed out of ${imageObjects.length} images`); - let defaultText = "No text available"; + logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed, ${skippedCount} decorative out of ${imageObjects.length} images`); for (const imageObject of imageObjects) { if (!combinedResults.hasOwnProperty(imageObject.id)) { - combinedResults[imageObject.id] = defaultText; + combinedResults[imageObject.id] = "Decorative element"; } } @@ -558,6 +646,8 @@ async function startProcess() { } catch (error) { logger.info(`File: ${filebasename}, Status: Error in second ECS task`); logger.error(`Filename: ${filebasename} | Error processing images: ${error}`); + await reportFailure(bucketName, filebasename, "UNKNOWN", + `Error processing images: ${error.message || error}`); process.exit(1); } } diff --git a/app.py b/app.py index 1148d0bd..2950dcf7 100644 --- a/app.py +++ b/app.py @@ -174,7 +174,8 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: cluster=pdf_remediation_cluster, task_definition=adobe_autotag_task_def, assign_public_ip=False, - + result_path="$.ecsResult", + container_overrides=[tasks.ContainerOverride( container_definition = adobe_autotag_container_def, environment=[ @@ -213,11 +214,11 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: environment=[ tasks.TaskEnvironmentVariable( name="S3_BUCKET_NAME", - value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[0].Value") + value=sfn.JsonPath.string_at("$.s3_bucket") ), tasks.TaskEnvironmentVariable( name="S3_FILE_KEY", - value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[1].Value") + value=sfn.JsonPath.string_at("$.s3_key") ), tasks.TaskEnvironmentVariable( name="AWS_REGION", @@ -366,6 +367,57 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: parallel_accessibility_workflow.branch(remediation_chain) parallel_accessibility_workflow.branch(pre_remediation_accessibility_checker_task) + # --------------------------------------------------------------------- + # Failure-handler: the single, exhaustive safety net for the workflow. + # + # The frontend detects a finished job by polling the S3 result/ folder. + # Previously, any failure left the workflow FAILED with nothing written + # to result/, so the UI polled forever ("silent failure"). This Lambda + # is wired to a Catch that fires on EVERY error -- in-code (Adobe/Bedrock/ + # complexity) and infrastructure (container can't start, OOM, timeout) -- + # and writes result/FAILED_.json carrying the reason category and + # the failing chunk/page range, so the UI can show the user what happened. + # It is invoked only on failure, so its cost is effectively zero. + # --------------------------------------------------------------------- + failure_handler_lambda = lambda_.Function( + self, 'WorkflowFailureHandlerLambda', + runtime=lambda_.Runtime.PYTHON_3_12, + handler='main.lambda_handler', + code=lambda_.Code.from_asset('lambda/failure-handler'), + timeout=Duration.seconds(120), + memory_size=256, + environment={ + 'BUCKET_NAME': pdf_processing_bucket.bucket_name, + } + ) + pdf_processing_bucket.grant_read_write(failure_handler_lambda) + failure_handler_lambda.add_to_role_policy(cloudwatch_metrics_policy) + + # Pass the original state ($) plus the execution ARN from the Step Functions + # context object ($$). The ARN lets the failure marker reference the exact + # failed execution for support/tracing. chunks/s3_bucket/failureInfo are + # carried through from the state so the handler can identify the file and + # the failure reason. + failure_handler_task = tasks.LambdaInvoke(self, "HandleWorkflowFailure", + lambda_function=failure_handler_lambda, + payload=sfn.TaskInput.from_object({ + "chunks": sfn.JsonPath.string_at("$.chunks"), + "s3_bucket": sfn.JsonPath.string_at("$.s3_bucket"), + "failureInfo": sfn.JsonPath.string_at("$.failureInfo"), + "executionArn": sfn.JsonPath.string_at("$$.Execution.Id"), + }), + output_path="$.Payload") + + # Route every failure of the parallel workflow to the failure handler. + # result_path keeps the original input (chunks, s3_bucket) intact and + # nests the Step Functions Error/Cause under $.failureInfo so the handler + # can recover both the file identity and the failure reason. + parallel_accessibility_workflow.add_catch( + failure_handler_task, + errors=["States.ALL"], + result_path="$.failureInfo" + ) + pdf_remediation_workflow_log_group = logs.LogGroup(self, "PdfRemediationWorkflowLogs", log_group_name="/aws/states/pdf-accessibility-remediation-workflow", retention=logs.RetentionDays.ONE_MONTH, diff --git a/docs/ERROR_HANDLING.md b/docs/ERROR_HANDLING.md new file mode 100644 index 00000000..02f37814 --- /dev/null +++ b/docs/ERROR_HANDLING.md @@ -0,0 +1,133 @@ +# Error Handling & Failure Reporting + +This document describes how the PDF-to-PDF remediation pipeline reports +failures so that **no failure is ever silent**, and the contract the frontend +uses to detect and display those failures. + +## Background: why this exists + +The frontend detects a finished job by **polling the S3 `result/` folder**: + +- On success the pipeline writes `result/COMPLIANT_.pdf`. +- Previously, on **any** failure the workflow ended with nothing written to + `result/`. The UI kept polling indefinitely and the user saw the job "spin" + for hours with no explanation. + +The error-handling layer closes that gap by guaranteeing that every failed job +produces a **failure marker** in the same `result/` folder the frontend already +polls, carrying a human-readable reason. + +## The two-layer design + +``` +Each station (on failure) Step Functions Catch (catches EVERYTHING) + writes a detail file ────────► routes any error to the failure-handler + temp//_errors/ Lambda, which aggregates the detail + .json files and writes the user-facing marker + (station, reason) result/FAILED_.json + │ + └──► structured CloudWatch line: + "File: , Status: FAILED | station=adobe | reason=ADOBE_API | " +``` + +1. **Station detail files** (`temp//_errors/*.json`) — each station writes + rich detail only it knows (the exact reason). +2. **Step Functions `Catch` → failure-handler Lambda** — the exhaustive safety + net. It fires on *every* failure, including infrastructure failures where a + container dies before it can write a detail file (image-pull failure, OOM, + task timeout, IAM error). It aggregates any detail files and always writes the + marker. If no detail file exists, it derives the reason from the Step + Functions error `Cause`. + +> The **PDF splitter** is a special case: it runs *before* the Step Functions +> execution starts, so the Catch cannot cover it. The splitter therefore writes +> the `result/FAILED_.json` marker directly. + +## Frontend contract + +When polling for results for an uploaded file `.pdf` (base name ``), +the frontend should check for **both** of these keys: + +| Key | Meaning | +|---|---| +| `result/COMPLIANT_.pdf` | Success — the remediated PDF is ready. | +| `result/FAILED_.json` | Failure — stop polling and show the reason. | + +### `FAILED_.json` schema + +```json +{ + "status": "FAILED", + "filename": "", + "reason_category": "ADOBE_API", + "summary": "Adobe API failed for this document. This document may be too complex for our Adobe API to handle.", + "failed_chunks": [ + { + "station": "adobe", + "reason_category": "ADOBE_API", + "message": "Adobe API failed for this document..." + } + ], + "execution_arn": "arn:aws:states:...:execution:...", + "timestamp": "2026-06-08T17:42:11.123456+00:00" +} +``` + +- `reason_category` and `summary` are safe to show directly to the user. +- `message` is the raw technical detail — useful for support, not necessarily + for end users. + +### Reason categories + +| `reason_category` | Meaning | +|---|---| +| `SPLIT` | The document could not be split into pages. | +| `ADOBE_API` | Adobe API failed for this document. Too complex for our Adobe API to handle. | +| `BEDROCK_API` | Amazon Bedrock (alt text / title) failed. | +| `COMPLEXITY` | The document exceeded processing complexity limits. | +| `MERGE` | The processed pages could not be merged. | +| `TITLE` | The title/metadata step failed. | +| `INFRA` | A task failed to run (infrastructure-level failure). | +| `UNKNOWN` | Unexpected failure; see `message`. | + +## CloudWatch visibility + +Each station emits structured lines for the CloudWatch dashboard: + +**Adobe failures:** +``` +File: , Status: FAILED | station=adobe | reason=ADOBE_API | Adobe API failed for this document... +``` + +**Bedrock per-image failures (with exact page number):** +``` +File: , Status: FAILED | station=alttext | image= | page=6 | reason=BEDROCK_API | +``` + +**Images tagged decorative due to aspect ratio (with exact page number):** +``` +File: , Status: INFO | station=alttext | image= | page=6 | action=decorative | reason=complex_image_aspect_ratio_25:1 +``` + +This lets operators see, per file, exactly which station failed and on which +page — without opening individual log streams. + +## Bedrock graceful handling + +Images that pass Adobe but cannot be processed by Bedrock are handled as follows: + +- **Aspect ratio > 20:1**: Tagged as "Decorative element" (known problematic images + like separator lines, thin borders). Logged to CloudWatch with exact page number. +- **Individual Bedrock API failure**: The specific image failure is logged with its + exact page number. If not all images fail, the pipeline continues. +- **All images fail Bedrock**: The container reports total failure and exits. The user + sees the error on UI. + +## Notes for maintainers + +- The failure-handler is invoked **only on failure**, so its cost is effectively + zero. No always-on services or new datastores are introduced. +- Stations report failures on a **best-effort** basis: a failure while writing a + detail file is logged but never masks the original error. +- The `page_num` column in the SQLite image database (written by the Adobe + container) enables exact page-level reporting in the alt-text station. diff --git a/lambda/failure-handler/main.py b/lambda/failure-handler/main.py new file mode 100644 index 00000000..36a6a0bd --- /dev/null +++ b/lambda/failure-handler/main.py @@ -0,0 +1,222 @@ +""" +Failure-handler Lambda for the PDF Accessibility remediation workflow. + +This function is the single, exhaustive safety net for the Step Functions +state machine. It is wired to a `Catch` block that captures EVERY failure in +the pipeline -- both in-code errors (Adobe/Bedrock API failures, complexity +issues) and infrastructure failures (a container that cannot start, an +out-of-memory kill, a task timeout, an IAM error). Whenever the workflow +fails, Step Functions routes here instead of ending silently. + +The frontend detects a finished job by polling the S3 `result/` folder. On +success the pipeline writes `result/COMPLIANT_`. This function provides +the missing failure signal in the SAME place the frontend already looks: + + result/FAILED_.json + +so the UI can stop polling and show the user WHY the job failed and WHERE +(which chunk / page range), instead of spinning indefinitely. + +Detail sources (in priority order): + 1. Per-station detail files at `temp//_errors/*.json`, written by + each station right before it exits on failure. These carry the rich + reason + chunk + page-range context that only the station knows. + 2. The Step Functions error `Cause`/`Error`, used as a fallback when a + station died before it could write a detail file (pure infra failure). + +The function NEVER raises: a safety net that can itself fail is not a safety +net. Any internal problem is logged and a best-effort marker is still written. +""" + +import json +import os +import boto3 +from datetime import datetime, timezone + +s3_client = boto3.client("s3") + +# Human-readable summaries for each reason category. The category itself is +# emitted by the stations; this map is only for the user-facing message. +REASON_SUMMARIES = { + "SPLIT": "The document could not be split into pages for processing.", + "ADOBE_API": "Adobe API failed for this document. This document may be too complex for our Adobe API to handle.", + "BEDROCK_API": "The AI model (Amazon Bedrock) failed while generating alt text.", + "COMPLEXITY": "The document exceeded processing complexity limits.", + "MERGE": "The processed pages could not be merged back into one document.", + "TITLE": "The document title/metadata step failed.", + "INFRA": "A processing task failed to run (infrastructure-level failure).", + "UNKNOWN": "Processing failed for an unexpected reason.", +} + + +def _derive_basename(event): + """Best-effort extraction of the original file's base name from the event. + + The state machine input is `{"chunks": [...], "s3_bucket": ...}` where each + chunk key looks like `temp//_chunk_.pdf`. We use that + to recover `` so we can both locate the `_errors/` folder and name + the marker `result/FAILED_.json`. + """ + chunks = event.get("chunks") or [] + if chunks and isinstance(chunks, list): + key = chunks[0].get("s3_key") or chunks[0].get("chunk_key") or "" + # key = temp//_chunk_1.pdf + parts = key.split("/") + if len(parts) >= 2 and parts[0] == "temp": + return parts[1] + return None + + +def _resolve_bucket(event): + """Find the processing bucket name from the event, with an env fallback.""" + return ( + event.get("s3_bucket") + or event.get("bucket") + or os.environ.get("BUCKET_NAME") + ) + + +def _collect_station_errors(bucket, basename): + """Read every `temp//_errors/*.json` detail file. + + Returns a list of dicts. Tolerates malformed/partial files -- a corrupt + detail file must never prevent the marker from being written. + """ + if not bucket or not basename: + return [] + + prefix = f"temp/{basename}/_errors/" + errors = [] + try: + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith(".json"): + continue + try: + body = s3_client.get_object(Bucket=bucket, Key=key)["Body"].read() + errors.append(json.loads(body)) + except Exception as e: # noqa: BLE001 - tolerate any bad file + print(f"[failure-handler] Could not read detail file {key}: {e}") + except Exception as e: # noqa: BLE001 + print(f"[failure-handler] Could not list {prefix}: {e}") + return errors + + +def _parse_sfn_cause(event): + """Extract a reason from the Step Functions Catch error payload. + + When a station dies before writing a detail file (e.g. the container could + not start), Step Functions still hands us `Error` and `Cause`. We map that + to the INFRA category so the user always gets *something* actionable. + + The Catch in app.py nests this under `$.failureInfo` (result_path), so we + look there first and fall back to the top level for robustness. + """ + failure_info = event.get("failureInfo") or event + err = failure_info.get("Error") + cause_raw = failure_info.get("Cause") + cause = cause_raw + if cause_raw: + try: + # ECS/Lambda causes are often JSON strings + parsed = json.loads(cause_raw) + cause = parsed.get("errorMessage") or parsed.get("Cause") or cause_raw + except (ValueError, TypeError): + cause = cause_raw + if err or cause: + return { + "station": "workflow", + "reason_category": "INFRA", + "message": cause or err or "Unknown infrastructure failure.", + } + return None + + +def lambda_handler(event, context): + """Aggregate failure detail and write the user-facing FAILED_ marker. + + `event` is the state-machine execution input augmented by the Catch block + with `Error`/`Cause`. This function is intentionally exception-proof. + """ + print(f"[failure-handler] Invoked with event: {json.dumps(event, default=str)}") + + bucket = _resolve_bucket(event) + basename = _derive_basename(event) + + station_errors = _collect_station_errors(bucket, basename) + + # Fall back to the Step Functions cause if no station detail was written. + if not station_errors: + sfn_error = _parse_sfn_cause(event) + if sfn_error: + station_errors = [sfn_error] + + # Build the failure detail from whatever station errors exist. + failed_chunks = [] + categories = [] + for err in station_errors: + category = err.get("reason_category", "UNKNOWN") + categories.append(category) + entry = { + "station": err.get("station", "unknown"), + "reason_category": category, + "message": err.get("message", ""), + } + if err.get("page_start") is not None: + entry["page_start"] = err["page_start"] + entry["page_end"] = err.get("page_end", err["page_start"]) + failed_chunks.append(entry) + + # Choose the primary category (first non-UNKNOWN, else UNKNOWN). + primary_category = next((c for c in categories if c != "UNKNOWN"), None) or ( + categories[0] if categories else "UNKNOWN" + ) + # Use the station's message as summary if it provides more detail than the + # generic category description (e.g. quota exhaustion vs. document complexity). + primary_error = next((e for e in station_errors if e.get("reason_category") == primary_category), None) + station_message = primary_error.get("message", "") if primary_error else "" + summary = station_message if station_message else REASON_SUMMARIES.get(primary_category, REASON_SUMMARIES["UNKNOWN"]) + + marker = { + "status": "FAILED", + "filename": basename, + "reason_category": primary_category, + "summary": summary, + "failed_chunks": failed_chunks, + "execution_arn": event.get("executionArn"), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + # Structured CloudWatch line for the dashboard "File status" widget. + detail_desc = "; ".join( + f"{c['station']}: {c['message'][:100]}" for c in failed_chunks if c.get("message") + ) + print( + f"File: {basename}, Status: FAILED | reason={primary_category}" + + (f" | {detail_desc}" if detail_desc else "") + ) + + # Write the marker where the frontend already polls. Best-effort: if even + # this fails, we log loudly but do not raise (raising would re-fail the + # state machine and produce a confusing double-failure). + if bucket and basename: + marker_key = f"result/FAILED_{basename}.json" + try: + s3_client.put_object( + Bucket=bucket, + Key=marker_key, + Body=json.dumps(marker, indent=2).encode("utf-8"), + ContentType="application/json", + ) + print(f"[failure-handler] Wrote failure marker to s3://{bucket}/{marker_key}") + except Exception as e: # noqa: BLE001 + print(f"[failure-handler] CRITICAL: could not write marker {marker_key}: {e}") + else: + print( + "[failure-handler] CRITICAL: missing bucket/basename; " + f"bucket={bucket} basename={basename}. Marker NOT written." + ) + + return marker diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java b/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java index 7b73ad16..c35d60f8 100644 --- a/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java +++ b/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java @@ -6,9 +6,12 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import org.apache.pdfbox.multipdf.PDFMergerUtility; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -79,10 +82,51 @@ public String handleRequest(Map input, Context context) { return String.format("PDFs merged successfully.\nBucket: %s\nMerged File Key: %s\nMerged File Name: %s", bucketName, outputKey, baseFileName); } catch (Exception e) { - baseFileName = baseFileName.replace(".pdf", ""); - System.out.println("File: " + baseFileName + ", Status: Failed in Merging the PDF"); - System.out.println(String.format("Filename: %s, File not found: %s", baseFileName, e.getMessage())); - return "Failed to merge PDFs."; + String reportName = baseFileName.replace(".pdf", ""); + System.out.println("File: " + reportName + ", Status: FAILED | station=merge | reason=MERGE | " + e.getMessage()); + // Write a station detail file the Step Functions failure-handler aggregates + // into the user-facing result/FAILED_.json marker. + reportFailure(bucketName, reportName, "MERGE", e.getMessage()); + // Re-throw so the state machine's Catch fires. Returning a string here + // would be treated as SUCCESS by Step Functions and continue silently. + throw new RuntimeException("Failed to merge PDFs for " + reportName, e); + } + } + + /** + * Writes a structured failure-detail file the Step Functions failure-handler + * aggregates into result/FAILED_.json. Station: 'merge'. + * Best-effort: a failure while reporting must not mask the original error. + * + * @param bucketName The S3 bucket. + * @param fileBaseName The base file name (no extension) matching temp//. + * @param reasonCategory The failure category (e.g. "MERGE"). + * @param message The error message. + */ + private void reportFailure(String bucketName, String fileBaseName, String reasonCategory, String message) { + if (bucketName == null || fileBaseName == null) { + return; + } + try { + // Escape for safe embedding in a JSON string literal: backslashes + // first, then quotes, then control chars that would break parsing. + String safeMessage = message == null ? "" : message + .replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\n", " ") + .replace("\r", " ") + .replace("\t", " "); + String json = String.format( + "{\"station\":\"merge\",\"reason_category\":\"%s\",\"message\":\"%s\"}", + reasonCategory, safeMessage); + byte[] bytes = json.getBytes(StandardCharsets.UTF_8); + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(bytes.length); + metadata.setContentType("application/json"); + String key = String.format("temp/%s/_errors/merge.json", fileBaseName); + s3Client.putObject(new PutObjectRequest(bucketName, key, new ByteArrayInputStream(bytes), metadata)); + } catch (Exception ex) { + System.out.println(String.format("Filename: %s, Could not write failure detail: %s", fileBaseName, ex.getMessage())); } } diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar b/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar index 99f5a0d5..d9e93f0e 100644 Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar differ diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class b/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class index 68125709..86272587 100644 Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class differ diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar b/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar index 175c54d9..76c540af 100644 Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar differ diff --git a/lambda/pdf-splitter-lambda/main.py b/lambda/pdf-splitter-lambda/main.py index b9b0fcab..07c41e2a 100644 --- a/lambda/pdf-splitter-lambda/main.py +++ b/lambda/pdf-splitter-lambda/main.py @@ -14,6 +14,7 @@ import urllib.parse import io import os +import datetime # Initialize AWS clients cloudwatch = boto3.client('cloudwatch') @@ -22,6 +23,41 @@ state_machine_arn = os.environ['STATE_MACHINE_ARN'] + +def report_failure(bucket_name, file_basename, reason_category, message): + """Write the user-facing result/FAILED_.json marker directly. + + The splitter is the FIRST station and it is what STARTS the Step Functions + execution. A failure here therefore happens before any state machine exists, + so the Step Functions Catch safety net cannot cover it. To keep the "no + silent failure" guarantee, the splitter writes the failure marker itself, in + the same result/ folder the frontend polls. Station: 'split'. + + Best-effort and exception-proof: a failure while reporting a failure must not + crash the handler. + """ + # Structured CloudWatch line for the dashboard "File status" widget. + print(f"File: {file_basename}, Status: FAILED | station=split | reason={reason_category} | {message}") + if not bucket_name or not file_basename: + return + marker = { + "status": "FAILED", + "filename": file_basename, + "reason_category": reason_category, + "summary": "The document could not be split into pages for processing.", + "failed_chunks": [{"station": "split", "reason_category": reason_category, "message": str(message)[:2000]}], + "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), + } + try: + s3_client.put_object( + Bucket=bucket_name, + Key=f"result/FAILED_{file_basename}.json", + Body=json.dumps(marker, indent=2).encode("utf-8"), + ContentType="application/json", + ) + except Exception as e: + print(f"Filename - {file_basename} | CRITICAL: could not write failure marker: {e}") + def log_chunk_created(filename): """ Logs the creation of a PDF chunk. @@ -98,7 +134,9 @@ def split_pdf_into_pages(source_content, original_key, s3_client, bucket_name, p chunks.append({ "s3_bucket": bucket_name, "s3_key": s3_key, - "chunk_key": s3_key # Key for the chunk + "chunk_key": s3_key, + "total_pages": num_pages, + "pages_in_chunk": min(pages_per_chunk, num_pages - start), }) return chunks @@ -120,8 +158,11 @@ def lambda_handler(event, context): Returns: dict: HTTP response indicating the success or failure of the Lambda function execution. """ + bucket_name = None + pdf_file_key = None + file_basename = None try: - + print("Received event: " + json.dumps(event, indent=2)) # Access the S3 event structure @@ -155,17 +196,19 @@ def lambda_handler(event, context): print(f"Filename - {pdf_file_key} | Step Function started: {response['executionArn']}") except KeyError as e: - + print(f"File: {file_basename}, Status: Failed in split lambda function") print(f"Filename - {pdf_file_key} | KeyError: {str(e)}") + report_failure(bucket_name, file_basename, "SPLIT", f"Missing key in event: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f"Error: Missing key in event: {str(e)}") } except ValueError as e: - + print(f"File: {file_basename}, Status: Failed in split lambda function") print(f"Filename - {pdf_file_key} | ValueError: {str(e)}") + report_failure(bucket_name, file_basename, "SPLIT", f"Invalid input: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f"Error: {str(e)}") @@ -173,7 +216,8 @@ def lambda_handler(event, context): except Exception as e: print(f"File: {file_basename}, Status: Failed in split lambda function") - print(f"Filename - {pdf_file_key} | Error occurred: {str(e)}", exc_info=True) + print(f"Filename - {pdf_file_key} | Error occurred: {str(e)}") + report_failure(bucket_name, file_basename, "SPLIT", f"Error processing event: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f"Error processing event: {str(e)}") diff --git a/lambda/title-generator-lambda/title_generator.py b/lambda/title-generator-lambda/title_generator.py index 9aa9ac0c..5a68e3c8 100644 --- a/lambda/title-generator-lambda/title_generator.py +++ b/lambda/title-generator-lambda/title_generator.py @@ -5,6 +5,33 @@ import random import fitz # PyMuPDF + +def report_failure(bucket, file_basename, reason_category, message): + """Write a station failure-detail file that the Step Functions failure-handler + aggregates into the user-facing result/FAILED_.json marker. Station: 'title'. + + The title generator runs inside the state machine, so the Step Functions Catch + is the ultimate safety net; this detail file simply gives the user the precise + reason. Best-effort and exception-proof. + """ + print(f"File: {file_basename}, Status: FAILED | station=title | reason={reason_category} | {message}") + if not bucket or not file_basename: + return + detail = { + "station": "title", + "reason_category": reason_category, + "message": str(message)[:2000], + } + try: + boto3.client('s3').put_object( + Bucket=bucket, + Key=f"temp/{file_basename}/_errors/title.json", + Body=json.dumps(detail).encode("utf-8"), + ContentType="application/json", + ) + except Exception as e: + print(f"Filename: {file_basename} | Could not write failure detail: {e}") + # Helper function for exponential backoff and retry def exponential_backoff_retry( func, @@ -199,12 +226,16 @@ def generate_title(extracted_text, current_title): def lambda_handler(event, context): + file_info = {} + file_basename = None try: payload = event.get("Payload") file_info = parse_payload(payload) print(f"(lambda_handler | Parsed file information: {file_info})") file_name = file_info['merged_file_name'] + # Folder basename matches temp// created upstream (no extension). + file_basename = file_name.rsplit('.', 1)[0] local_path = f'/tmp/{file_name}' download_file_from_s3(file_info['bucket'], file_info['merged_file_key'], local_path, file_info['merged_file_name']) @@ -212,13 +243,8 @@ def lambda_handler(event, context): pdf_document = fitz.open(local_path) except Exception as e: print(f"(lambda_handler | Failed to open PDF file {file_name}: {e})") - return { - "statusCode": 500, - "body": { - "error": f"Failed to open PDF file {file_name}.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to open merged PDF: {e}") + raise try: extracted_text = extract_text_from_pdf(pdf_document) @@ -226,13 +252,8 @@ def lambda_handler(event, context): except Exception as e: print(f"(lambda_handler | Failed to extract text from PDF: {e})") pdf_document.close() - return { - "statusCode": 500, - "body": { - "error": "Failed to extract text from PDF.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to extract text: {e}") + raise try: title = generate_title(extracted_text, file_name) @@ -240,13 +261,8 @@ def lambda_handler(event, context): except Exception as e: print(f"(lambda_handler | Failed to generate title: {e})") pdf_document.close() - return { - "statusCode": 500, - "body": { - "error": "Failed to generate title.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "BEDROCK_API", f"Failed to generate title via Bedrock: {e}") + raise try: set_custom_metadata(pdf_document, file_name, title) @@ -255,26 +271,16 @@ def lambda_handler(event, context): except Exception as e: print(f"(lambda_handler | Failed to set metadata or save PDF: {e})") pdf_document.close() - return { - "statusCode": 500, - "body": { - "error": "Failed to set metadata or save PDF.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to set metadata or save PDF: {e}") + raise try: save_path = save_to_s3(local_path, file_info['bucket'], file_name) print(f"(lambda_handler | Saved file to S3 at: {save_path})") except Exception as e: print(f"(lambda_handler | Failed to save file to S3: {e})") - return { - "statusCode": 500, - "body": { - "error": "Failed to save file to S3.", - "details": f"{file_name} - {str(e)}" - } - } + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to save final PDF to S3: {e}") + raise return { "statusCode": 200, @@ -286,10 +292,8 @@ def lambda_handler(event, context): } except Exception as e: print(f"(lambda_handler | General error in lambda_handler: {e})") - return { - "statusCode": 500, - "body": { - "error": "An unexpected error occurred.", - "details": f"Filename: {file_info.get('merged_file_name','Unknown')} - {str(e)}" - } - } + # Report and re-raise: returning a 500 dict would be treated as SUCCESS by + # the Step Functions LambdaInvoke and let the workflow continue silently. + # Raising ensures the state machine's Catch fires and the user is notified. + report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Unexpected error: {e}") + raise