Skip to content

Commit f2fac5b

Browse files
Avoid KeyError building SQS partial batch failures without messageId
Read messageId from the underlying event dict when handler failures occur on malformed SQS records missing that field.
1 parent 683ba5f commit f2fac5b

2 files changed

Lines changed: 37 additions & 9 deletions

File tree

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -335,18 +335,22 @@ def _get_messages_to_report(self) -> list[PartialItemFailures]:
335335

336336
# Event Source Data Classes follow python idioms for fields
337337
# while Parser/Pydantic follows the event field names to the latter
338+
def _sqs_failure_item_identifier(self, msg) -> str:
339+
# If a message failed due to model validation (e.g., poison pill)
340+
# we convert to an event source data class...but self.model is still true
341+
# therefore, we do an additional check on whether the failed message is still a model
342+
# see https://github.com/aws-powertools/powertools-lambda-python/issues/2091
343+
if self.model and getattr(msg, "model_validate", None):
344+
return msg.messageId
345+
data = msg._data if hasattr(msg, "_data") else msg
346+
if isinstance(data, dict):
347+
return data.get("messageId", "")
348+
return msg.message_id
349+
338350
def _collect_sqs_failures(self):
339351
failures = []
340352
for msg in self.fail_messages:
341-
# If a message failed due to model validation (e.g., poison pill)
342-
# we convert to an event source data class...but self.model is still true
343-
# therefore, we do an additional check on whether the failed message is still a model
344-
# see https://github.com/aws-powertools/powertools-lambda-python/issues/2091
345-
if self.model and getattr(msg, "model_validate", None):
346-
msg_id = msg.messageId
347-
else:
348-
msg_id = msg.message_id
349-
failures.append({"itemIdentifier": msg_id})
353+
failures.append({"itemIdentifier": self._sqs_failure_item_identifier(msg)})
350354
return failures
351355

352356
def _collect_kinesis_failures(self):

tests/functional/batch/required_dependencies/test_utilities_batch.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,3 +861,27 @@ async def simple_async_handler(record: SQSRecord):
861861
# THEN record is processed successfully using asyncio.run()
862862
assert result == {"batchItemFailures": []}
863863
assert result == {"batchItemFailures": []}
864+
865+
866+
def test_sqs_batch_processor_missing_message_id_does_not_crash_on_handler_failure():
867+
processor = BatchProcessor(event_type=EventType.SQS, raise_on_entire_batch_failure=False)
868+
869+
def record_handler(record):
870+
raise RuntimeError("boom")
871+
872+
malformed_record = {
873+
"body": "{}",
874+
"receiptHandle": "rh",
875+
"attributes": {"ApproximateReceiveCount": "1"},
876+
"messageAttributes": {},
877+
"md5OfBody": "abc",
878+
"eventSource": "aws:sqs",
879+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue",
880+
"awsRegion": "us-east-1",
881+
}
882+
883+
with processor(records=[malformed_record], handler=record_handler):
884+
processor.process()
885+
886+
response = processor.response()
887+
assert response == {"batchItemFailures": [{"itemIdentifier": ""}]}

0 commit comments

Comments
 (0)