A DRY, configurable, declarative Python library for working with AWS Lambdas that encourages Happy Path Programming — validate first, execute later — eliminating defensive try/except chains and tangled conditionals.
Learn how to wire API Gateway routes, validate payloads with OpenAPI or Pydantic, and process event-based services in the Acai AWS docs.
Building Lambda functions shouldn’t require boilerplate or ad-hoc validation. Acai AWS provides:
- 🚀 Zero Boilerplate – Auto-discover handlers based on directory, glob, or mapping modes
- ✅ Built-in Validation – OpenAPI schema enforcement or Pydantic models with no extra glue code
- 🛡️ Declarative Requirements – Decorators to plug in auth, before/after hooks, timeouts, and schema rules
- 🔄 Event Processing – Consistent abstractions for DynamoDB Streams, SQS, S3, SNS, Kinesis, Firehose, MSK, MQ, DocumentDB, and ALB
- 🧪 Easy Testing – Lightweight objects make unittest/pytest straightforward
- ⚙️ IDE-Friendly – Intuitive, type-friendly request/response objects for a better developer experience
Acai AWS embraces Happy Path Programming (HPP) — validate inputs upfront so business logic stays clean:
# ❌ Without Acai AWS: Defend every line
def handler(event, _context):
body = json.loads(event.get('body') or '{}')
if 'email' not in body:
return {"statusCode": 400, "body": '{"error": "Email required"}'}
if not EMAIL_REGEX.match(body['email']):
return {"statusCode": 400, "body": '{"error": "Invalid email"}'}
# ... additional checks ...
return {"statusCode": 200, "body": json.dumps(do_work(body))}# ✅ With Acai AWS: Validation is centralized
from acai_aws.apigateway.requirements import requirements
@requirements(required_body='v1-user-post-request')
def post(request, response):
# request.body already validated
response.body = {'userId': '123', 'email': request.body['email']}
return responsepip install acai_aws
# pipenv install acai_aws
# poetry add acai_aws- Python: 3.8+
- AWS SDK: Optional, only needed for features like S3 object fetching (
boto3is installed by default)
# app.py (entry point for your Lambda)
from acai_aws.apigateway.router import Router
def authenticate(request, response, requirements):
if request.headers.get('x-api-key') != 'secret-key':
response.code = 401
response.set_error('auth', 'Unauthorized')
router = Router(
base_path='api/v1',
handlers='handlers', # directory mode
schema='openapi.yml', # optional OpenAPI document
auto_validate=True,
validate_response=True,
with_auth=authenticate
)
router.auto_load()
def handler(event, context):
return router.route(event, context)# handlers/users.py
from acai_aws.apigateway.requirements import requirements
@requirements(
auth_required=True,
required_body={ # can be a dataclass or reference to a schema in openapi.yml
'type': 'object',
'required': ['email', 'name'],
'properties': {
'email': {'type': 'string', 'format': 'email'},
'name': {'type': 'string'}
}
}
)
def post(request, response):
response.body = {
'id': 'user-123',
'email': request.body['email'],
'name': request.body['name']
}
return response
def get(_request, response):
response.body = {'users': []}
return responsefrom acai_aws.apigateway.router import Router
router = Router(
base_path='your-service/v1',
handlers='api/handlers',
schema='api/openapi.yml'
)
router.auto_load()
def handle(event, context):
return router.route(event, context)The router automatically maps file structure to routes (see the table in the docs). For alternative modes—pattern globbing or explicit mappings—refer to the configuration guide.
~~ Directory ~~ ~~ Route ~~
===================================================================
📦api/ |
│---📂handlers |
│---📜router.py |
│---📜org.py | /org
│---📂grower |
│---📜__init__.py | /grower
│---📜_grower_id.py | /grower/{grower_id}
│---📂farm |
│---📜__init__.py | /farm
│---📂_farm_id |
│---📜__init__.py | /farm/{farm_id}
│---📂field |
│---📜__init__.py | /farm/{farm_id}/field
│---📜_field_id.py | /farm/{farm_id}/field/{field_id}
acai_aws.alb.Router provides the same routing, validation, and middleware as the API Gateway router for Lambda functions invoked by an Application Load Balancer. It subclasses apigateway.Router and reuses the same handler files, @requirements decorator, OpenAPI auto-validation, and directory/mapping/pattern resolution — only the request/response transport differs.
from acai_aws.alb.router import Router
router = Router(
base_path='api/v1',
handlers='handlers',
)
router.auto_load()
def handler(event, context):
return router.route(event, context)ALB-specific behavior layered on top of the apigateway router:
- Base64 bodies — when the ALB target group sends
isBase64Encoded: true,request.bodyis transparently decoded. statusDescription— every response includes thestatusDescriptionfield ALB expects (e.g."200 OK","404 Not Found").- Request extras —
request.target_group_arnandrequest.source_ip(read fromx-forwarded-for).
For records-style ALB processing (treating each invocation as a batch event rather than HTTP routing), see the ALB section below.
pipenv run generate
# → loads handlers, inspects @requirements metadata, and updates openapi.yml/jsonAcai AWS provides consistent event objects for AWS stream and queue services. Decorate your handler with acai_aws.common.records.requirements.requirements to auto-detect the source and wrap records.
from acai_aws.dynamodb.requirements import requirements
class ProductRecord:
def __init__(self, record):
self.id = record.body['id']
self.payload = record.body
@requirements(
operations=['created', 'updated'],
timeout=10,
data_class=ProductRecord
)
def handler(records):
for record in records.records:
process_product(record.id, record.payload)
return {'processed': len(records.records)}Supported services include:
DynamoDB Streams
from acai_aws.dynamodb.requirements import requirements as ddb_requirements
@ddb_requirements()
def dynamodb_handler(records):
for record in records.records:
handle_ddb_change(record.operation, record.body)Amazon SQS
from acai_aws.sqs.requirements import requirements as sqs_requirements
@sqs_requirements()
def sqs_handler(records):
for record in records.records:
handle_message(record.body, record.attributes)Amazon SNS
from acai_aws.sns.requirements import requirements as sns_requirements
@sns_requirements()
def sns_handler(records):
for record in records.records:
handle_notification(record.body, record.subject)Amazon S3
from acai_aws.s3.requirements import requirements as s3_requirements
@s3_requirements(get_object=True, data_type='json')
def s3_handler(records):
for record in records.records:
handle_object(record.bucket, record.key, record.body)Amazon Kinesis
from acai_aws.kinesis.requirements import requirements as kinesis_requirements
@kinesis_requirements()
def kinesis_handler(records):
for record in records.records:
handle_stream_event(record.partition_key, record.body)Amazon Firehose
from acai_aws.firehose.requirements import requirements as firehose_requirements
@firehose_requirements()
def firehose_handler(records):
for record in records.records:
handle_delivery(record.record_id, record.body)Amazon MSK
from acai_aws.msk.requirements import requirements as msk_requirements
@msk_requirements()
def msk_handler(records):
for record in records.records:
handle_msk_message(record.topic, record.body)Amazon MQ
from acai_aws.mq.requirements import requirements as mq_requirements
@mq_requirements()
def mq_handler(records):
for record in records.records:
handle_mq_message(record.message_id, record.body)Amazon DocumentDB Change Streams
from acai_aws.documentdb.requirements import requirements as docdb_requirements
@docdb_requirements()
def docdb_handler(records):
for record in records.records:
handle_docdb_change(record.operation, record.full_document)Application Load Balancer (ALB)
from acai_aws.alb.requirements import requirements as alb_requirements
@alb_requirements()
def alb_handler(records):
for record in records.records:
handle_request(record.http_method, record.body, record.source_ip)Each record exposes intuitive properties like record.operation, record.body, or service-specific metadata (bucket, partition, headers, etc.).
Records events (SQS, Kinesis, DynamoDB, SNS, S3, MSK, MQ, Firehose, DocumentDB, ALB) accept the same required_body validation as API Gateway — either a JSON Schema dict, an OpenAPI component name string, or a Pydantic BaseModel subclass.
from pydantic import BaseModel
from acai_aws.sqs.requirements import requirements
class OrderEvent(BaseModel):
order_id: str
amount: float
@requirements(required_body=OrderEvent)
def handler(event):
for record in event.records: # invalid records filtered before this loop
process(record.body)By default, invalid records are silently filtered out of event.records — preserving existing behavior. Opt into louder modes with failure_mode (also controls the operations=[...] filter).
from acai_aws.base.event import FailureMode| Mode | Behavior |
|---|---|
FailureMode.SILENT_IGNORE (default) |
Invalid records dropped silently. Handler receives only valid records. |
FailureMode.LOG_WARN |
Invalid records dropped and each failure emitted as a structured WARN log entry. |
FailureMode.RAISE_ERROR |
First invalid record raises RecordException immediately, before the handler runs. Replaces the deprecated raise_body_error=True / raise_operation_error=True booleans. |
FailureMode.RETURN_FAILURE |
Invalid records collected on event.invalid_records. After the handler returns, the decorator auto-merges framework-detected failures into the response's batchItemFailures list (SQS/Kinesis/DynamoDB) — compatible with AWS Lambda partial batch response. |
When failure_mode=FailureMode.RETURN_FAILURE is set on a source that supports partial batch response, the framework merges its detected failures into whatever your handler returns — letting the event-source mapping retry malformed messages correctly instead of silently dropping them.
from pydantic import BaseModel
from acai_aws.base.event import FailureMode
from acai_aws.sqs.requirements import requirements
class OrderEvent(BaseModel):
order_id: str
amount: float
@requirements(required_body=OrderEvent, failure_mode=FailureMode.RETURN_FAILURE)
def handler(event):
handler_failures = []
for record in event.records: # only valid records
try:
process(record.body)
except TransientError:
handler_failures.append({'itemIdentifier': record.message_id})
return {'batchItemFailures': handler_failures}
# Framework appends validation failures to batchItemFailures automatically.
# SQS retries both kinds. No silent data loss.Per-source identifiers used by the auto-merge:
| Source | itemIdentifier |
|---|---|
| SQS | record.message_id |
| Kinesis | record.sequence_number |
| DynamoDB Streams | record.sequence_number |
| MSK | f'{record.topic}-{record.partition}-{record.offset}' |
Sources without partial-batch support (SNS, S3, Firehose, MQ, DocumentDB) fall back to SILENT_IGNORE semantics for RETURN_FAILURE — use LOG_WARN for those instead.
ALB is synchronous HTTP — partial-batch semantics don't apply. When required_body is set on an ALB handler and the body fails validation, the framework short-circuits the handler and returns an HTTP 400 response in the same shape API Gateway produces:
{
"statusCode": 400,
"headers": {"Content-Type": "application/json"},
"body": "{\"errors\": [{\"key_path\": \"amount\", \"message\": \"Input should be a valid number\"}]}",
"isBase64Encoded": false
}from acai_aws.alb.requirements import requirements
@requirements(required_body=OrderEvent)
def alb_handler(event):
# Only runs when the body validates. Otherwise framework returns 400.
return {'statusCode': 200, 'body': '{"ok": true}'}The old raise_body_error=True and raise_operation_error=True kwargs still work and translate to failure_mode=FailureMode.RAISE_ERROR, but emit a deprecation warning. Prefer the unified enum going forward.
- OpenAPI Generator – CLI (
python -m acai_aws.apigateway generate-openapi) scans handlers and updates schema docs - Request/Response Helpers – Access JSON, GraphQL, form, XML, or raw bodies via
Request.json,Request.form, etc. - Logging – Configurable JSON/inline logging via
acai_aws.common.logger - Validation – JSON Schema (Draft 7) and Pydantic support with helpful error messages
pipenv install --dev
pipenv run test # run unittest discovery
pipenv run coverage # run pytest suite with coverage reports
pipenv run lint # run pylint with bundled rulesimport json
from unittest import TestCase
from acai_aws.apigateway.router import Router
class UsersEndpointTest(TestCase):
def setUp(self):
self.router = Router(base_path='api/v1', handlers='tests/handlers')
def test_creates_user(self):
event = {
'path': 'api/v1/users',
'httpMethod': 'POST',
'headers': {'content-type': 'application/json'},
'body': json.dumps({'email': 'unit@example.com', 'name': 'Unit'})
}
result = self.router.route(event, None)
payload = json.loads(result['body'])
self.assertEqual(200, result['statusCode'])
self.assertEqual('unit@example.com', payload['email'])Contributions welcome! Follow the usual GitHub flow:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-idea) - Write tests and code (
pipenv run test) - Run linting (
pipenv run lint) - Open a Pull Request
git clone https://github.com/syngenta/acai-python.git
cd acai-python
pipenv install --dev
pipenv run test
pipenv run lintApache 2.0 © Paul Cruse III
Acai AWS continues the Happy Path philosophy introduced in acai-js and expanded by Acai-TS. Thanks to the original contributors who made Lambda development less painful.
- 📖 Documentation: https://syngenta.github.io/acai-python-docs/
- 💻 Examples: https://github.com/syngenta/acai-python-docs/tree/main/examples
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions
Made with 💙 by developers who believe AWS Lambda development should be enjoyable.