diff --git a/adobe-autotag-container/adobe_autotag_processor.py b/adobe-autotag-container/adobe_autotag_processor.py
index 0afd7185..2c0fafb4 100644
--- a/adobe-autotag-container/adobe_autotag_processor.py
+++ b/adobe-autotag-container/adobe_autotag_processor.py
@@ -91,6 +91,59 @@
s3 = boto3.client('s3')
+def _chunk_page_range(chunk_key):
+ """Return (page_start, page_end) for a chunk key like temp//_chunk_N.pdf.
+
+ Uses the splitter's fixed 200-pages-per-chunk constant to compute the range.
+ Returns (None, None) if the chunk number cannot be parsed.
+ """
+ import re
+ PAGES_PER_CHUNK = 200
+ if not chunk_key:
+ return None, None
+ m = re.search(r'_chunk_(\d+)\.pdf$', chunk_key)
+ if not m:
+ return None, None
+ n = int(m.group(1))
+ page_start = (n - 1) * PAGES_PER_CHUNK + 1
+ page_end = n * PAGES_PER_CHUNK
+ return page_start, page_end
+
+
+def report_failure(bucket_name, file_base_name, chunk_key, reason_category, message):
+ """Write a structured failure-detail file the Step Functions failure-handler
+ aggregates into the user-facing result/FAILED_.json marker.
+
+ Station: 'adobe' (Adobe AutoTag/Extract). Best-effort and exception-proof:
+ reporting a failure must never throw a second failure that masks the original.
+ """
+ page_start, page_end = _chunk_page_range(chunk_key)
+ logger.error(
+ f"File: {file_base_name}, Status: FAILED | station=adobe | "
+ f"reason={reason_category} | page={page_start}-{page_end} | {message}"
+ )
+
+ if not bucket_name or not file_base_name:
+ return
+ detail = {
+ "station": "adobe",
+ "reason_category": reason_category,
+ "message": str(message)[:2000],
+ }
+ if page_start is not None:
+ detail["page_start"] = page_start
+ detail["page_end"] = page_end
+ try:
+ s3.put_object(
+ Bucket=bucket_name,
+ Key=f"temp/{file_base_name}/_errors/adobe_failure.json",
+ Body=json.dumps(detail).encode("utf-8"),
+ ContentType="application/json",
+ )
+ except Exception as e:
+ logger.error(f"Filename : {file_base_name} | Could not write failure detail: {e}")
+
+
def download_file_from_s3(bucket_name,file_base_name, file_key, local_path):
"""
Download a file from an S3 bucket.
@@ -430,7 +483,8 @@ def create_sqlite_db(by_page, filename, images_output_dir, object_ids, image_pat
prev TEXT,
current TEXT,
next TEXT,
- context TEXT
+ context TEXT,
+ page_num INTEGER
)
""")
@@ -524,12 +578,13 @@ def create_sqlite_db(by_page, filename, images_output_dir, object_ids, image_pat
print(" ======================")
# Insert the data into the SQLite database.
cursor.execute("""
- INSERT INTO image_data (objid, img_path, context)
- VALUES (?, ?, ?)
+ INSERT INTO image_data (objid, img_path, context, page_num)
+ VALUES (?, ?, ?, ?)
""", (
current_candidate["objid"],
current_candidate["filePaths"][0].split("/")[-1],
- context
+ context,
+ pg_num + 1
))
print("Added in the database: ", current_candidate["objid"],
current_candidate["filePaths"][0].split("/")[-1])
@@ -620,7 +675,8 @@ def extract_images_from_excel(filename, figure_path, autotag_report_path, images
prev TEXT,
current TEXT,
next TEXT,
- context TEXT
+ context TEXT,
+ page_num INTEGER
)
""")
@@ -640,11 +696,12 @@ def main():
"""
file_key = None
file_base_name = None
-
- try:
- bucket_name = os.getenv('S3_BUCKET_NAME')
+ s3_file_key = None
+ bucket_name = os.getenv('S3_BUCKET_NAME')
+
+ try:
s3_file_key = os.getenv('S3_FILE_KEY')
-
+
if not bucket_name or not s3_file_key:
logging.error("Error: S3_BUCKET_NAME and S3_FILE_KEY environment variables are required.")
sys.exit(1)
@@ -714,21 +771,36 @@ def main():
logging.info(f'Filename : {file_key} | Processing completed successfully')
logger.info(f"File: {file_base_name}, Status: Succeeded in First ECS task")
- except (ServiceApiException, ServiceUsageException, SdkException) as e:
+ except ServiceUsageException as e:
+ # Quota / credits exhausted — not a document problem, retrying later may work.
+ logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - Adobe API Quota")
+ logger.error(f"Filename : {file_key} | Adobe API Quota Error: {e}")
+ report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API",
+ f"Adobe API quota or credits exhausted. Please try again later. Adobe error: {e}")
+ sys.exit(1)
+ except (ServiceApiException, SdkException) as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - Adobe API Error")
logger.error(f"Filename : {file_key} | Adobe API Error: {e}")
+ report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API",
+ f"Adobe API failed for this document. This document may be too complex for our Adobe API to handle. Adobe error: {e}")
sys.exit(1)
except ClientError as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - AWS Error")
logger.error(f"Filename : {file_key} | AWS Error: {e}")
+ report_failure(bucket_name, file_base_name, s3_file_key, "INFRA",
+ f"AWS infrastructure error: {e}")
sys.exit(1)
except FileNotFoundError as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task - File Not Found")
logger.error(f"Filename : {file_key} | File Not Found Error: {e}")
+ report_failure(bucket_name, file_base_name, s3_file_key, "ADOBE_API",
+ "Adobe API failed for this document. This document may be too complex for our Adobe API to handle.")
sys.exit(1)
except Exception as e:
logger.error(f"File: {file_base_name}, Status: Failed in First ECS task")
logger.error(f"Filename : {file_key} | Unexpected Error: {e}")
+ report_failure(bucket_name, file_base_name, s3_file_key, "UNKNOWN",
+ f"Unexpected error processing this document: {e}")
sys.exit(1)
if __name__ == "__main__":
diff --git a/alt-text-generator-container/alt_text_generator.js b/alt-text-generator-container/alt_text_generator.js
index 05e39a98..2212a716 100644
--- a/alt-text-generator-container/alt_text_generator.js
+++ b/alt-text-generator-container/alt_text_generator.js
@@ -69,6 +69,73 @@ function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
+const MAX_ASPECT_RATIO = 20;
+
+function getImageDimensions(buffer) {
+ try {
+ if (buffer[0] === 0x89 && buffer[1] === 0x50) {
+ // PNG: width at offset 16, height at offset 20 (4 bytes each, big-endian)
+ const width = buffer.readUInt32BE(16);
+ const height = buffer.readUInt32BE(20);
+ return { width, height };
+ }
+ if (buffer[0] === 0xFF && buffer[1] === 0xD8) {
+ // JPEG: scan for SOF0 marker (0xFFC0)
+ let offset = 2;
+ while (offset < buffer.length - 9) {
+ if (buffer[offset] === 0xFF) {
+ const marker = buffer[offset + 1];
+ if (marker >= 0xC0 && marker <= 0xCF && marker !== 0xC4 && marker !== 0xC8 && marker !== 0xCC) {
+ const height = buffer.readUInt16BE(offset + 5);
+ const width = buffer.readUInt16BE(offset + 7);
+ return { width, height };
+ }
+ const segLen = buffer.readUInt16BE(offset + 2);
+ offset += 2 + segLen;
+ } else {
+ offset++;
+ }
+ }
+ }
+ } catch (e) {
+ return null;
+ }
+ return null;
+}
+
+function isAspectRatioValid(dimensions) {
+ if (!dimensions || !dimensions.width || !dimensions.height) return true;
+ const { width, height } = dimensions;
+ const ratio = Math.max(width / height, height / width);
+ return ratio <= MAX_ASPECT_RATIO;
+}
+
+/**
+ * Writes a structured failure-detail file that the Step Functions failure-handler
+ * aggregates into the user-facing result/FAILED_.json marker.
+ */
+async function reportFailure(bucketName, fileBaseName, reasonCategory, message) {
+ logger.error(`File: ${fileBaseName}, Status: FAILED | station=alttext | reason=${reasonCategory} | ${message}`);
+
+ if (!bucketName || !fileBaseName) return;
+ const detail = {
+ station: "alttext",
+ reason_category: reasonCategory,
+ message: String(message).slice(0, 2000),
+ };
+ try {
+ await s3Client.send(new PutObjectCommand({
+ Bucket: bucketName,
+ Key: `temp/${fileBaseName}/_errors/alttext_failure.json`,
+ Body: JSON.stringify(detail),
+ ContentType: "application/json",
+ }));
+ } catch (e) {
+ logger.error(`Filename: ${fileBaseName} | Could not write failure detail: ${e.message || e}`);
+ }
+}
+
+
/**
* Invokes the Bedrock AI model to generate alt text for a given image.
@@ -471,6 +538,7 @@ async function startProcess() {
context_json: {
context: row.context,
},
+ page_num: row.page_num || null,
};
});
} catch (err) {
@@ -492,6 +560,7 @@ async function startProcess() {
logger.info(`Filename: ${filebasename} | imageObjects: ${imageObjects}`);
logger.info(`Filename: ${filebasename} | Total images to process: ${imageObjects.length}`);
+ let skippedCount = 0;
for (const imageObject of imageObjects) {
try {
const getObjectParams = {
@@ -502,7 +571,7 @@ async function startProcess() {
logger.info(`Filename: ${filebasename} | Image Object Bucketname: ${bucketName}`);
const command = new GetObjectCommand(getObjectParams);
const { Body } = await s3Client.send(command);
-
+
// Stream the body contents to a buffer
const chunks = [];
await pipeline(Body, async function* (source) {
@@ -511,6 +580,18 @@ async function startProcess() {
}
});
const fileBuffer = Buffer.concat(chunks);
+
+ // Aspect-ratio pre-check: skip images with extreme ratios
+ const dimensions = getImageDimensions(fileBuffer);
+ if (dimensions && !isAspectRatioValid(dimensions)) {
+ const ratio = Math.max(dimensions.width / dimensions.height, dimensions.height / dimensions.width).toFixed(1);
+ const pageDesc = imageObject.page_num ? ` | page=${imageObject.page_num}` : "";
+ logger.info(`File: ${filebasename}, Status: INFO | station=alttext | image=${imageObject.id}${pageDesc} | action=decorative | reason=complex_image_aspect_ratio_${ratio}:1`);
+ skippedCount++;
+ combinedResults[imageObject.id] = "Decorative element";
+ continue;
+ }
+
const localFilePath = path.join(__dirname, `${imageObject.path.split('/').pop()}`);
logger.info(`Filename: ${filebasename} | Local File Path: ${localFilePath}`);
fs_1.writeFileSync(localFilePath, fileBuffer);
@@ -522,26 +603,33 @@ async function startProcess() {
logger.info(`Filename: ${filebasename} | Alt text generation succeeded for image ${imageObject.id} (${successCount} succeeded, ${failureCount} failed)`);
} catch (error) {
failureCount++;
- logger.error(`Filename: ${filebasename} | Alt text generation failed for image ${imageObject.id}: ${error.message || error}`);
+ const pageDesc = imageObject.page_num ? ` | page=${imageObject.page_num}` : "";
+ logger.error(`File: ${filebasename}, Status: FAILED | station=alttext | image=${imageObject.id}${pageDesc} | reason=BEDROCK_API | ${error.message || error}`);
logger.info(`Filename: ${filebasename} | Progress: ${successCount} succeeded, ${failureCount} failed`);
}
await sleep(2000);
}
- // Check if we have any images and if all of them failed
- if (imageObjects.length > 0 && successCount === 0) {
+ const processedImages = imageObjects.length - skippedCount;
+ if (processedImages > 0 && successCount === 0) {
logger.error(`Filename: ${filebasename} | All ${failureCount} alt text generation requests failed - likely due to throttling or Bedrock API issues`);
logger.error(`File: ${filebasename}, Status: Failed in second ECS task - All Bedrock requests failed`);
+ const failedPages = [...new Set(
+ imageObjects
+ .filter(img => img.page_num != null && !combinedResults.hasOwnProperty(img.id))
+ .map(img => img.page_num)
+ )].sort((a, b) => a - b);
+ const pageInfo = failedPages.length > 0 ? ` Failed on pages: ${failedPages.join(', ')}.` : '';
+ await reportFailure(bucketName, filebasename, "BEDROCK_API",
+ `All ${failureCount} Bedrock alt-text requests failed (throttling or Bedrock API issues).${pageInfo}`);
process.exit(1);
}
-
- logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed out of ${imageObjects.length} images`);
- let defaultText = "No text available";
+ logger.info(`Filename: ${filebasename} | Alt text generation complete: ${successCount} succeeded, ${failureCount} failed, ${skippedCount} decorative out of ${imageObjects.length} images`);
for (const imageObject of imageObjects) {
if (!combinedResults.hasOwnProperty(imageObject.id)) {
- combinedResults[imageObject.id] = defaultText;
+ combinedResults[imageObject.id] = "Decorative element";
}
}
@@ -558,6 +646,8 @@ async function startProcess() {
} catch (error) {
logger.info(`File: ${filebasename}, Status: Error in second ECS task`);
logger.error(`Filename: ${filebasename} | Error processing images: ${error}`);
+ await reportFailure(bucketName, filebasename, "UNKNOWN",
+ `Error processing images: ${error.message || error}`);
process.exit(1);
}
}
diff --git a/app.py b/app.py
index 1148d0bd..2950dcf7 100644
--- a/app.py
+++ b/app.py
@@ -174,7 +174,8 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
cluster=pdf_remediation_cluster,
task_definition=adobe_autotag_task_def,
assign_public_ip=False,
-
+ result_path="$.ecsResult",
+
container_overrides=[tasks.ContainerOverride(
container_definition = adobe_autotag_container_def,
environment=[
@@ -213,11 +214,11 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
environment=[
tasks.TaskEnvironmentVariable(
name="S3_BUCKET_NAME",
- value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[0].Value")
+ value=sfn.JsonPath.string_at("$.s3_bucket")
),
tasks.TaskEnvironmentVariable(
name="S3_FILE_KEY",
- value=sfn.JsonPath.string_at("$.Overrides.ContainerOverrides[0].Environment[1].Value")
+ value=sfn.JsonPath.string_at("$.s3_key")
),
tasks.TaskEnvironmentVariable(
name="AWS_REGION",
@@ -366,6 +367,57 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
parallel_accessibility_workflow.branch(remediation_chain)
parallel_accessibility_workflow.branch(pre_remediation_accessibility_checker_task)
+ # ---------------------------------------------------------------------
+ # Failure-handler: the single, exhaustive safety net for the workflow.
+ #
+ # The frontend detects a finished job by polling the S3 result/ folder.
+ # Previously, any failure left the workflow FAILED with nothing written
+ # to result/, so the UI polled forever ("silent failure"). This Lambda
+ # is wired to a Catch that fires on EVERY error -- in-code (Adobe/Bedrock/
+ # complexity) and infrastructure (container can't start, OOM, timeout) --
+ # and writes result/FAILED_.json carrying the reason category and
+ # the failing chunk/page range, so the UI can show the user what happened.
+ # It is invoked only on failure, so its cost is effectively zero.
+ # ---------------------------------------------------------------------
+ failure_handler_lambda = lambda_.Function(
+ self, 'WorkflowFailureHandlerLambda',
+ runtime=lambda_.Runtime.PYTHON_3_12,
+ handler='main.lambda_handler',
+ code=lambda_.Code.from_asset('lambda/failure-handler'),
+ timeout=Duration.seconds(120),
+ memory_size=256,
+ environment={
+ 'BUCKET_NAME': pdf_processing_bucket.bucket_name,
+ }
+ )
+ pdf_processing_bucket.grant_read_write(failure_handler_lambda)
+ failure_handler_lambda.add_to_role_policy(cloudwatch_metrics_policy)
+
+ # Pass the original state ($) plus the execution ARN from the Step Functions
+ # context object ($$). The ARN lets the failure marker reference the exact
+ # failed execution for support/tracing. chunks/s3_bucket/failureInfo are
+ # carried through from the state so the handler can identify the file and
+ # the failure reason.
+ failure_handler_task = tasks.LambdaInvoke(self, "HandleWorkflowFailure",
+ lambda_function=failure_handler_lambda,
+ payload=sfn.TaskInput.from_object({
+ "chunks": sfn.JsonPath.string_at("$.chunks"),
+ "s3_bucket": sfn.JsonPath.string_at("$.s3_bucket"),
+ "failureInfo": sfn.JsonPath.string_at("$.failureInfo"),
+ "executionArn": sfn.JsonPath.string_at("$$.Execution.Id"),
+ }),
+ output_path="$.Payload")
+
+ # Route every failure of the parallel workflow to the failure handler.
+ # result_path keeps the original input (chunks, s3_bucket) intact and
+ # nests the Step Functions Error/Cause under $.failureInfo so the handler
+ # can recover both the file identity and the failure reason.
+ parallel_accessibility_workflow.add_catch(
+ failure_handler_task,
+ errors=["States.ALL"],
+ result_path="$.failureInfo"
+ )
+
pdf_remediation_workflow_log_group = logs.LogGroup(self, "PdfRemediationWorkflowLogs",
log_group_name="/aws/states/pdf-accessibility-remediation-workflow",
retention=logs.RetentionDays.ONE_MONTH,
diff --git a/docs/ERROR_HANDLING.md b/docs/ERROR_HANDLING.md
new file mode 100644
index 00000000..02f37814
--- /dev/null
+++ b/docs/ERROR_HANDLING.md
@@ -0,0 +1,133 @@
+# Error Handling & Failure Reporting
+
+This document describes how the PDF-to-PDF remediation pipeline reports
+failures so that **no failure is ever silent**, and the contract the frontend
+uses to detect and display those failures.
+
+## Background: why this exists
+
+The frontend detects a finished job by **polling the S3 `result/` folder**:
+
+- On success the pipeline writes `result/COMPLIANT_.pdf`.
+- Previously, on **any** failure the workflow ended with nothing written to
+ `result/`. The UI kept polling indefinitely and the user saw the job "spin"
+ for hours with no explanation.
+
+The error-handling layer closes that gap by guaranteeing that every failed job
+produces a **failure marker** in the same `result/` folder the frontend already
+polls, carrying a human-readable reason.
+
+## The two-layer design
+
+```
+Each station (on failure) Step Functions Catch (catches EVERYTHING)
+ writes a detail file ────────► routes any error to the failure-handler
+ temp//_errors/ Lambda, which aggregates the detail
+ .json files and writes the user-facing marker
+ (station, reason) result/FAILED_.json
+ │
+ └──► structured CloudWatch line:
+ "File: , Status: FAILED | station=adobe | reason=ADOBE_API | "
+```
+
+1. **Station detail files** (`temp//_errors/*.json`) — each station writes
+ rich detail only it knows (the exact reason).
+2. **Step Functions `Catch` → failure-handler Lambda** — the exhaustive safety
+ net. It fires on *every* failure, including infrastructure failures where a
+ container dies before it can write a detail file (image-pull failure, OOM,
+ task timeout, IAM error). It aggregates any detail files and always writes the
+ marker. If no detail file exists, it derives the reason from the Step
+ Functions error `Cause`.
+
+> The **PDF splitter** is a special case: it runs *before* the Step Functions
+> execution starts, so the Catch cannot cover it. The splitter therefore writes
+> the `result/FAILED_.json` marker directly.
+
+## Frontend contract
+
+When polling for results for an uploaded file `.pdf` (base name ``),
+the frontend should check for **both** of these keys:
+
+| Key | Meaning |
+|---|---|
+| `result/COMPLIANT_.pdf` | Success — the remediated PDF is ready. |
+| `result/FAILED_.json` | Failure — stop polling and show the reason. |
+
+### `FAILED_.json` schema
+
+```json
+{
+ "status": "FAILED",
+ "filename": "",
+ "reason_category": "ADOBE_API",
+ "summary": "Adobe API failed for this document. This document may be too complex for our Adobe API to handle.",
+ "failed_chunks": [
+ {
+ "station": "adobe",
+ "reason_category": "ADOBE_API",
+ "message": "Adobe API failed for this document..."
+ }
+ ],
+ "execution_arn": "arn:aws:states:...:execution:...",
+ "timestamp": "2026-06-08T17:42:11.123456+00:00"
+}
+```
+
+- `reason_category` and `summary` are safe to show directly to the user.
+- `message` is the raw technical detail — useful for support, not necessarily
+ for end users.
+
+### Reason categories
+
+| `reason_category` | Meaning |
+|---|---|
+| `SPLIT` | The document could not be split into pages. |
+| `ADOBE_API` | Adobe API failed for this document. Too complex for our Adobe API to handle. |
+| `BEDROCK_API` | Amazon Bedrock (alt text / title) failed. |
+| `COMPLEXITY` | The document exceeded processing complexity limits. |
+| `MERGE` | The processed pages could not be merged. |
+| `TITLE` | The title/metadata step failed. |
+| `INFRA` | A task failed to run (infrastructure-level failure). |
+| `UNKNOWN` | Unexpected failure; see `message`. |
+
+## CloudWatch visibility
+
+Each station emits structured lines for the CloudWatch dashboard:
+
+**Adobe failures:**
+```
+File: , Status: FAILED | station=adobe | reason=ADOBE_API | Adobe API failed for this document...
+```
+
+**Bedrock per-image failures (with exact page number):**
+```
+File: , Status: FAILED | station=alttext | image= | page=6 | reason=BEDROCK_API |
+```
+
+**Images tagged decorative due to aspect ratio (with exact page number):**
+```
+File: , Status: INFO | station=alttext | image= | page=6 | action=decorative | reason=complex_image_aspect_ratio_25:1
+```
+
+This lets operators see, per file, exactly which station failed and on which
+page — without opening individual log streams.
+
+## Bedrock graceful handling
+
+Images that pass Adobe but cannot be processed by Bedrock are handled as follows:
+
+- **Aspect ratio > 20:1**: Tagged as "Decorative element" (known problematic images
+ like separator lines, thin borders). Logged to CloudWatch with exact page number.
+- **Individual Bedrock API failure**: The specific image failure is logged with its
+ exact page number. If not all images fail, the pipeline continues.
+- **All images fail Bedrock**: The container reports total failure and exits. The user
+ sees the error on UI.
+
+## Notes for maintainers
+
+- The failure-handler is invoked **only on failure**, so its cost is effectively
+ zero. No always-on services or new datastores are introduced.
+- Stations report failures on a **best-effort** basis: a failure while writing a
+ detail file is logged but never masks the original error.
+- The `page_num` column in the SQLite image database (written by the Adobe
+ container) enables exact page-level reporting in the alt-text station.
diff --git a/lambda/failure-handler/main.py b/lambda/failure-handler/main.py
new file mode 100644
index 00000000..36a6a0bd
--- /dev/null
+++ b/lambda/failure-handler/main.py
@@ -0,0 +1,222 @@
+"""
+Failure-handler Lambda for the PDF Accessibility remediation workflow.
+
+This function is the single, exhaustive safety net for the Step Functions
+state machine. It is wired to a `Catch` block that captures EVERY failure in
+the pipeline -- both in-code errors (Adobe/Bedrock API failures, complexity
+issues) and infrastructure failures (a container that cannot start, an
+out-of-memory kill, a task timeout, an IAM error). Whenever the workflow
+fails, Step Functions routes here instead of ending silently.
+
+The frontend detects a finished job by polling the S3 `result/` folder. On
+success the pipeline writes `result/COMPLIANT_`. This function provides
+the missing failure signal in the SAME place the frontend already looks:
+
+ result/FAILED_.json
+
+so the UI can stop polling and show the user WHY the job failed and WHERE
+(which chunk / page range), instead of spinning indefinitely.
+
+Detail sources (in priority order):
+ 1. Per-station detail files at `temp//_errors/*.json`, written by
+ each station right before it exits on failure. These carry the rich
+ reason + chunk + page-range context that only the station knows.
+ 2. The Step Functions error `Cause`/`Error`, used as a fallback when a
+ station died before it could write a detail file (pure infra failure).
+
+The function NEVER raises: a safety net that can itself fail is not a safety
+net. Any internal problem is logged and a best-effort marker is still written.
+"""
+
+import json
+import os
+import boto3
+from datetime import datetime, timezone
+
+s3_client = boto3.client("s3")
+
+# Human-readable summaries for each reason category. The category itself is
+# emitted by the stations; this map is only for the user-facing message.
+REASON_SUMMARIES = {
+ "SPLIT": "The document could not be split into pages for processing.",
+ "ADOBE_API": "Adobe API failed for this document. This document may be too complex for our Adobe API to handle.",
+ "BEDROCK_API": "The AI model (Amazon Bedrock) failed while generating alt text.",
+ "COMPLEXITY": "The document exceeded processing complexity limits.",
+ "MERGE": "The processed pages could not be merged back into one document.",
+ "TITLE": "The document title/metadata step failed.",
+ "INFRA": "A processing task failed to run (infrastructure-level failure).",
+ "UNKNOWN": "Processing failed for an unexpected reason.",
+}
+
+
+def _derive_basename(event):
+ """Best-effort extraction of the original file's base name from the event.
+
+ The state machine input is `{"chunks": [...], "s3_bucket": ...}` where each
+ chunk key looks like `temp//_chunk_.pdf`. We use that
+ to recover `` so we can both locate the `_errors/` folder and name
+ the marker `result/FAILED_.json`.
+ """
+ chunks = event.get("chunks") or []
+ if chunks and isinstance(chunks, list):
+ key = chunks[0].get("s3_key") or chunks[0].get("chunk_key") or ""
+ # key = temp//_chunk_1.pdf
+ parts = key.split("/")
+ if len(parts) >= 2 and parts[0] == "temp":
+ return parts[1]
+ return None
+
+
+def _resolve_bucket(event):
+ """Find the processing bucket name from the event, with an env fallback."""
+ return (
+ event.get("s3_bucket")
+ or event.get("bucket")
+ or os.environ.get("BUCKET_NAME")
+ )
+
+
+def _collect_station_errors(bucket, basename):
+ """Read every `temp//_errors/*.json` detail file.
+
+ Returns a list of dicts. Tolerates malformed/partial files -- a corrupt
+ detail file must never prevent the marker from being written.
+ """
+ if not bucket or not basename:
+ return []
+
+ prefix = f"temp/{basename}/_errors/"
+ errors = []
+ try:
+ paginator = s3_client.get_paginator("list_objects_v2")
+ for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
+ for obj in page.get("Contents", []):
+ key = obj["Key"]
+ if not key.endswith(".json"):
+ continue
+ try:
+ body = s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
+ errors.append(json.loads(body))
+ except Exception as e: # noqa: BLE001 - tolerate any bad file
+ print(f"[failure-handler] Could not read detail file {key}: {e}")
+ except Exception as e: # noqa: BLE001
+ print(f"[failure-handler] Could not list {prefix}: {e}")
+ return errors
+
+
+def _parse_sfn_cause(event):
+ """Extract a reason from the Step Functions Catch error payload.
+
+ When a station dies before writing a detail file (e.g. the container could
+ not start), Step Functions still hands us `Error` and `Cause`. We map that
+ to the INFRA category so the user always gets *something* actionable.
+
+ The Catch in app.py nests this under `$.failureInfo` (result_path), so we
+ look there first and fall back to the top level for robustness.
+ """
+ failure_info = event.get("failureInfo") or event
+ err = failure_info.get("Error")
+ cause_raw = failure_info.get("Cause")
+ cause = cause_raw
+ if cause_raw:
+ try:
+ # ECS/Lambda causes are often JSON strings
+ parsed = json.loads(cause_raw)
+ cause = parsed.get("errorMessage") or parsed.get("Cause") or cause_raw
+ except (ValueError, TypeError):
+ cause = cause_raw
+ if err or cause:
+ return {
+ "station": "workflow",
+ "reason_category": "INFRA",
+ "message": cause or err or "Unknown infrastructure failure.",
+ }
+ return None
+
+
+def lambda_handler(event, context):
+ """Aggregate failure detail and write the user-facing FAILED_ marker.
+
+ `event` is the state-machine execution input augmented by the Catch block
+ with `Error`/`Cause`. This function is intentionally exception-proof.
+ """
+ print(f"[failure-handler] Invoked with event: {json.dumps(event, default=str)}")
+
+ bucket = _resolve_bucket(event)
+ basename = _derive_basename(event)
+
+ station_errors = _collect_station_errors(bucket, basename)
+
+ # Fall back to the Step Functions cause if no station detail was written.
+ if not station_errors:
+ sfn_error = _parse_sfn_cause(event)
+ if sfn_error:
+ station_errors = [sfn_error]
+
+ # Build the failure detail from whatever station errors exist.
+ failed_chunks = []
+ categories = []
+ for err in station_errors:
+ category = err.get("reason_category", "UNKNOWN")
+ categories.append(category)
+ entry = {
+ "station": err.get("station", "unknown"),
+ "reason_category": category,
+ "message": err.get("message", ""),
+ }
+ if err.get("page_start") is not None:
+ entry["page_start"] = err["page_start"]
+ entry["page_end"] = err.get("page_end", err["page_start"])
+ failed_chunks.append(entry)
+
+ # Choose the primary category (first non-UNKNOWN, else UNKNOWN).
+ primary_category = next((c for c in categories if c != "UNKNOWN"), None) or (
+ categories[0] if categories else "UNKNOWN"
+ )
+ # Use the station's message as summary if it provides more detail than the
+ # generic category description (e.g. quota exhaustion vs. document complexity).
+ primary_error = next((e for e in station_errors if e.get("reason_category") == primary_category), None)
+ station_message = primary_error.get("message", "") if primary_error else ""
+ summary = station_message if station_message else REASON_SUMMARIES.get(primary_category, REASON_SUMMARIES["UNKNOWN"])
+
+ marker = {
+ "status": "FAILED",
+ "filename": basename,
+ "reason_category": primary_category,
+ "summary": summary,
+ "failed_chunks": failed_chunks,
+ "execution_arn": event.get("executionArn"),
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ }
+
+ # Structured CloudWatch line for the dashboard "File status" widget.
+ detail_desc = "; ".join(
+ f"{c['station']}: {c['message'][:100]}" for c in failed_chunks if c.get("message")
+ )
+ print(
+ f"File: {basename}, Status: FAILED | reason={primary_category}"
+ + (f" | {detail_desc}" if detail_desc else "")
+ )
+
+ # Write the marker where the frontend already polls. Best-effort: if even
+ # this fails, we log loudly but do not raise (raising would re-fail the
+ # state machine and produce a confusing double-failure).
+ if bucket and basename:
+ marker_key = f"result/FAILED_{basename}.json"
+ try:
+ s3_client.put_object(
+ Bucket=bucket,
+ Key=marker_key,
+ Body=json.dumps(marker, indent=2).encode("utf-8"),
+ ContentType="application/json",
+ )
+ print(f"[failure-handler] Wrote failure marker to s3://{bucket}/{marker_key}")
+ except Exception as e: # noqa: BLE001
+ print(f"[failure-handler] CRITICAL: could not write marker {marker_key}: {e}")
+ else:
+ print(
+ "[failure-handler] CRITICAL: missing bucket/basename; "
+ f"bucket={bucket} basename={basename}. Marker NOT written."
+ )
+
+ return marker
diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java b/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java
index 7b73ad16..c35d60f8 100644
--- a/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java
+++ b/lambda/pdf-merger-lambda/PDFMergerLambda/src/main/java/com/example/App.java
@@ -6,9 +6,12 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.pdfbox.multipdf.PDFMergerUtility;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -79,10 +82,51 @@ public String handleRequest(Map input, Context context) {
return String.format("PDFs merged successfully.\nBucket: %s\nMerged File Key: %s\nMerged File Name: %s",
bucketName, outputKey, baseFileName);
} catch (Exception e) {
- baseFileName = baseFileName.replace(".pdf", "");
- System.out.println("File: " + baseFileName + ", Status: Failed in Merging the PDF");
- System.out.println(String.format("Filename: %s, File not found: %s", baseFileName, e.getMessage()));
- return "Failed to merge PDFs.";
+ String reportName = baseFileName.replace(".pdf", "");
+ System.out.println("File: " + reportName + ", Status: FAILED | station=merge | reason=MERGE | " + e.getMessage());
+ // Write a station detail file the Step Functions failure-handler aggregates
+ // into the user-facing result/FAILED_.json marker.
+ reportFailure(bucketName, reportName, "MERGE", e.getMessage());
+ // Re-throw so the state machine's Catch fires. Returning a string here
+ // would be treated as SUCCESS by Step Functions and continue silently.
+ throw new RuntimeException("Failed to merge PDFs for " + reportName, e);
+ }
+ }
+
+ /**
+ * Writes a structured failure-detail file the Step Functions failure-handler
+ * aggregates into result/FAILED_.json. Station: 'merge'.
+ * Best-effort: a failure while reporting must not mask the original error.
+ *
+ * @param bucketName The S3 bucket.
+ * @param fileBaseName The base file name (no extension) matching temp//.
+ * @param reasonCategory The failure category (e.g. "MERGE").
+ * @param message The error message.
+ */
+ private void reportFailure(String bucketName, String fileBaseName, String reasonCategory, String message) {
+ if (bucketName == null || fileBaseName == null) {
+ return;
+ }
+ try {
+ // Escape for safe embedding in a JSON string literal: backslashes
+ // first, then quotes, then control chars that would break parsing.
+ String safeMessage = message == null ? "" : message
+ .replace("\\", "\\\\")
+ .replace("\"", "\\\"")
+ .replace("\n", " ")
+ .replace("\r", " ")
+ .replace("\t", " ");
+ String json = String.format(
+ "{\"station\":\"merge\",\"reason_category\":\"%s\",\"message\":\"%s\"}",
+ reasonCategory, safeMessage);
+ byte[] bytes = json.getBytes(StandardCharsets.UTF_8);
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(bytes.length);
+ metadata.setContentType("application/json");
+ String key = String.format("temp/%s/_errors/merge.json", fileBaseName);
+ s3Client.putObject(new PutObjectRequest(bucketName, key, new ByteArrayInputStream(bytes), metadata));
+ } catch (Exception ex) {
+ System.out.println(String.format("Filename: %s, Could not write failure detail: %s", fileBaseName, ex.getMessage()));
}
}
diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar b/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar
index 99f5a0d5..d9e93f0e 100644
Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/PDFMergerLambda-1.0-SNAPSHOT.jar differ
diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class b/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class
index 68125709..86272587 100644
Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/classes/com/example/App.class differ
diff --git a/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar b/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar
index 175c54d9..76c540af 100644
Binary files a/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar and b/lambda/pdf-merger-lambda/PDFMergerLambda/target/original-PDFMergerLambda-1.0-SNAPSHOT.jar differ
diff --git a/lambda/pdf-splitter-lambda/main.py b/lambda/pdf-splitter-lambda/main.py
index b9b0fcab..07c41e2a 100644
--- a/lambda/pdf-splitter-lambda/main.py
+++ b/lambda/pdf-splitter-lambda/main.py
@@ -14,6 +14,7 @@
import urllib.parse
import io
import os
+import datetime
# Initialize AWS clients
cloudwatch = boto3.client('cloudwatch')
@@ -22,6 +23,41 @@
state_machine_arn = os.environ['STATE_MACHINE_ARN']
+
+def report_failure(bucket_name, file_basename, reason_category, message):
+ """Write the user-facing result/FAILED_.json marker directly.
+
+ The splitter is the FIRST station and it is what STARTS the Step Functions
+ execution. A failure here therefore happens before any state machine exists,
+ so the Step Functions Catch safety net cannot cover it. To keep the "no
+ silent failure" guarantee, the splitter writes the failure marker itself, in
+ the same result/ folder the frontend polls. Station: 'split'.
+
+ Best-effort and exception-proof: a failure while reporting a failure must not
+ crash the handler.
+ """
+ # Structured CloudWatch line for the dashboard "File status" widget.
+ print(f"File: {file_basename}, Status: FAILED | station=split | reason={reason_category} | {message}")
+ if not bucket_name or not file_basename:
+ return
+ marker = {
+ "status": "FAILED",
+ "filename": file_basename,
+ "reason_category": reason_category,
+ "summary": "The document could not be split into pages for processing.",
+ "failed_chunks": [{"station": "split", "reason_category": reason_category, "message": str(message)[:2000]}],
+ "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
+ }
+ try:
+ s3_client.put_object(
+ Bucket=bucket_name,
+ Key=f"result/FAILED_{file_basename}.json",
+ Body=json.dumps(marker, indent=2).encode("utf-8"),
+ ContentType="application/json",
+ )
+ except Exception as e:
+ print(f"Filename - {file_basename} | CRITICAL: could not write failure marker: {e}")
+
def log_chunk_created(filename):
"""
Logs the creation of a PDF chunk.
@@ -98,7 +134,9 @@ def split_pdf_into_pages(source_content, original_key, s3_client, bucket_name, p
chunks.append({
"s3_bucket": bucket_name,
"s3_key": s3_key,
- "chunk_key": s3_key # Key for the chunk
+ "chunk_key": s3_key,
+ "total_pages": num_pages,
+ "pages_in_chunk": min(pages_per_chunk, num_pages - start),
})
return chunks
@@ -120,8 +158,11 @@ def lambda_handler(event, context):
Returns:
dict: HTTP response indicating the success or failure of the Lambda function execution.
"""
+ bucket_name = None
+ pdf_file_key = None
+ file_basename = None
try:
-
+
print("Received event: " + json.dumps(event, indent=2))
# Access the S3 event structure
@@ -155,17 +196,19 @@ def lambda_handler(event, context):
print(f"Filename - {pdf_file_key} | Step Function started: {response['executionArn']}")
except KeyError as e:
-
+
print(f"File: {file_basename}, Status: Failed in split lambda function")
print(f"Filename - {pdf_file_key} | KeyError: {str(e)}")
+ report_failure(bucket_name, file_basename, "SPLIT", f"Missing key in event: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f"Error: Missing key in event: {str(e)}")
}
except ValueError as e:
-
+
print(f"File: {file_basename}, Status: Failed in split lambda function")
print(f"Filename - {pdf_file_key} | ValueError: {str(e)}")
+ report_failure(bucket_name, file_basename, "SPLIT", f"Invalid input: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f"Error: {str(e)}")
@@ -173,7 +216,8 @@ def lambda_handler(event, context):
except Exception as e:
print(f"File: {file_basename}, Status: Failed in split lambda function")
- print(f"Filename - {pdf_file_key} | Error occurred: {str(e)}", exc_info=True)
+ print(f"Filename - {pdf_file_key} | Error occurred: {str(e)}")
+ report_failure(bucket_name, file_basename, "SPLIT", f"Error processing event: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps(f"Error processing event: {str(e)}")
diff --git a/lambda/title-generator-lambda/title_generator.py b/lambda/title-generator-lambda/title_generator.py
index 9aa9ac0c..5a68e3c8 100644
--- a/lambda/title-generator-lambda/title_generator.py
+++ b/lambda/title-generator-lambda/title_generator.py
@@ -5,6 +5,33 @@
import random
import fitz # PyMuPDF
+
+def report_failure(bucket, file_basename, reason_category, message):
+ """Write a station failure-detail file that the Step Functions failure-handler
+ aggregates into the user-facing result/FAILED_.json marker. Station: 'title'.
+
+ The title generator runs inside the state machine, so the Step Functions Catch
+ is the ultimate safety net; this detail file simply gives the user the precise
+ reason. Best-effort and exception-proof.
+ """
+ print(f"File: {file_basename}, Status: FAILED | station=title | reason={reason_category} | {message}")
+ if not bucket or not file_basename:
+ return
+ detail = {
+ "station": "title",
+ "reason_category": reason_category,
+ "message": str(message)[:2000],
+ }
+ try:
+ boto3.client('s3').put_object(
+ Bucket=bucket,
+ Key=f"temp/{file_basename}/_errors/title.json",
+ Body=json.dumps(detail).encode("utf-8"),
+ ContentType="application/json",
+ )
+ except Exception as e:
+ print(f"Filename: {file_basename} | Could not write failure detail: {e}")
+
# Helper function for exponential backoff and retry
def exponential_backoff_retry(
func,
@@ -199,12 +226,16 @@ def generate_title(extracted_text, current_title):
def lambda_handler(event, context):
+ file_info = {}
+ file_basename = None
try:
payload = event.get("Payload")
file_info = parse_payload(payload)
print(f"(lambda_handler | Parsed file information: {file_info})")
file_name = file_info['merged_file_name']
+ # Folder basename matches temp// created upstream (no extension).
+ file_basename = file_name.rsplit('.', 1)[0]
local_path = f'/tmp/{file_name}'
download_file_from_s3(file_info['bucket'], file_info['merged_file_key'], local_path, file_info['merged_file_name'])
@@ -212,13 +243,8 @@ def lambda_handler(event, context):
pdf_document = fitz.open(local_path)
except Exception as e:
print(f"(lambda_handler | Failed to open PDF file {file_name}: {e})")
- return {
- "statusCode": 500,
- "body": {
- "error": f"Failed to open PDF file {file_name}.",
- "details": f"{file_name} - {str(e)}"
- }
- }
+ report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to open merged PDF: {e}")
+ raise
try:
extracted_text = extract_text_from_pdf(pdf_document)
@@ -226,13 +252,8 @@ def lambda_handler(event, context):
except Exception as e:
print(f"(lambda_handler | Failed to extract text from PDF: {e})")
pdf_document.close()
- return {
- "statusCode": 500,
- "body": {
- "error": "Failed to extract text from PDF.",
- "details": f"{file_name} - {str(e)}"
- }
- }
+ report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to extract text: {e}")
+ raise
try:
title = generate_title(extracted_text, file_name)
@@ -240,13 +261,8 @@ def lambda_handler(event, context):
except Exception as e:
print(f"(lambda_handler | Failed to generate title: {e})")
pdf_document.close()
- return {
- "statusCode": 500,
- "body": {
- "error": "Failed to generate title.",
- "details": f"{file_name} - {str(e)}"
- }
- }
+ report_failure(file_info.get('bucket'), file_basename, "BEDROCK_API", f"Failed to generate title via Bedrock: {e}")
+ raise
try:
set_custom_metadata(pdf_document, file_name, title)
@@ -255,26 +271,16 @@ def lambda_handler(event, context):
except Exception as e:
print(f"(lambda_handler | Failed to set metadata or save PDF: {e})")
pdf_document.close()
- return {
- "statusCode": 500,
- "body": {
- "error": "Failed to set metadata or save PDF.",
- "details": f"{file_name} - {str(e)}"
- }
- }
+ report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to set metadata or save PDF: {e}")
+ raise
try:
save_path = save_to_s3(local_path, file_info['bucket'], file_name)
print(f"(lambda_handler | Saved file to S3 at: {save_path})")
except Exception as e:
print(f"(lambda_handler | Failed to save file to S3: {e})")
- return {
- "statusCode": 500,
- "body": {
- "error": "Failed to save file to S3.",
- "details": f"{file_name} - {str(e)}"
- }
- }
+ report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Failed to save final PDF to S3: {e}")
+ raise
return {
"statusCode": 200,
@@ -286,10 +292,8 @@ def lambda_handler(event, context):
}
except Exception as e:
print(f"(lambda_handler | General error in lambda_handler: {e})")
- return {
- "statusCode": 500,
- "body": {
- "error": "An unexpected error occurred.",
- "details": f"Filename: {file_info.get('merged_file_name','Unknown')} - {str(e)}"
- }
- }
+ # Report and re-raise: returning a 500 dict would be treated as SUCCESS by
+ # the Step Functions LambdaInvoke and let the workflow continue silently.
+ # Raising ensures the state machine's Catch fires and the user is notified.
+ report_failure(file_info.get('bucket'), file_basename, "TITLE", f"Unexpected error: {e}")
+ raise