Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion infrastructure/instance/sqs_id_sync.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
resource "aws_sqs_queue" "id_sync_queue" {
name = "imms-${local.resource_scope}-id-sync-queue"
kms_master_key_id = data.aws_kms_key.existing_id_sync_sqs_encryption_key.arn
visibility_timeout_seconds = 360
visibility_timeout_seconds = 1080 # as per AWS docs to be 6 times the Lambda function timeout but kept to 3 times
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.id_sync_dlq.arn
maxReceiveCount = 4
Expand Down
47 changes: 25 additions & 22 deletions lambdas/id_sync/src/id_sync.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
"""
- Parses the incoming AWS event into `AwsLambdaEvent` and iterate its `records`.
- Delegate each record to `process_record` and collect `nhs_number` from each result.
- If any record has status == "error" raise `IdSyncException` with aggregated nhs_numbers.
- Any unexpected error is wrapped into `IdSyncException(message="Error processing id_sync event")`.
- Parses the incoming AWS event into `AwsLambdaEvent` and iterates its `records`.
- Delegates each record to `process_record` with per-record exception isolation.
- Returns {"batchItemFailures": [...]} for any failed records so SQS only re-drives the failing messages.
- A handler-level exception (bad event schema etc.) re-raises to trigger full batch retry.
"""

from typing import Any

from common.aws_lambda_event import AwsLambdaEvent
from common.clients import STREAM_NAME, logger
from common.log_decorator import logging_decorator
from exceptions.id_sync_exception import IdSyncException
from record_processor import process_record


Expand All @@ -25,28 +24,32 @@ def handler(event_data: dict[str, Any], _context) -> dict[str, Any]:

logger.info("id_sync processing event with %d records", len(records))

error_count = 0
batch_item_failures = []

for record in records:
result = process_record(record)

if result.get("status") == "error":
error_count += 1

if error_count > 0:
raise IdSyncException(
message=f"Processed {len(records)} records with {error_count} errors",
)
try:
result = process_record(record)
if result.get("status") == "error":
message_id = record.get("messageId")
logger.error(
"id_sync record processing failed for messageId: %s — %s",
message_id,
result.get("message"),
)
batch_item_failures.append({"itemIdentifier": message_id})
except Exception:
message_id = record.get("messageId")
logger.exception("Unexpected error processing messageId: %s", message_id)
batch_item_failures.append({"itemIdentifier": message_id})

if batch_item_failures:
logger.error("id_sync completed with %d/%d failures", len(batch_item_failures), len(records))
return {"batchItemFailures": batch_item_failures}

response = {"status": "success", "message": f"Successfully processed {len(records)} records"}

logger.info("id_sync handler completed: %s", response)
return response

except IdSyncException as e:
logger.exception(f"id_sync error: {e.message}")
raise
except Exception:
msg = "Error processing id_sync event"
logger.exception(msg)
raise IdSyncException(message=msg)
logger.exception("Unexpected error processing id_sync event")
raise
Loading
Loading