diff --git a/lambda-ecs-durable-python-sam/README.md b/lambda-ecs-durable-python-sam/README.md new file mode 100644 index 000000000..4c14c9ea6 --- /dev/null +++ b/lambda-ecs-durable-python-sam/README.md @@ -0,0 +1,138 @@ +# Lambda Durable Functions to Amazon ECS with Python + +This pattern demonstrates how to invoke Amazon ECS tasks from AWS Lambda durable functions using Python. The workflow starts an ECS task, waits for a callback, and resumes based on the task result while maintaining state across the pause/resume cycle. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/lambda-ecs-python-sam + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* [Docker](https://docs.docker.com/get-docker/) installed (for building Lambda container images) +* [Python 3.13](https://www.python.org/downloads/) or later + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd lambda-ecs-python-sam + ``` +1. From the command line, use AWS SAM to build the application: + ``` + sam build + ``` +1. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yaml file: + ``` + sam deploy --guided + ``` +1. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Enter the VpcCIDR parameter (default: 10.0.0.0/16) + * Allow SAM CLI to create IAM roles with the required permissions. + * Create managed ECR repositories for all functions (required for container images) + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + +1. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +This pattern implements an ECS task orchestration workflow using Lambda durable functions with callback pattern: + +1. **Sync Lambda** starts an ECS task and polls for completion using durable waits (no compute charges during waits) +2. **Callback Lambda** starts an ECS task, pauses execution using `callback.result()`, and waits for a callback +3. The ECS task processes work and calls Lambda durable execution callback API when complete +4. The Lambda function resumes automatically when the callback is invoked and returns the result + +The pattern uses the AWS Durable Execution SDK for Python with the `@durable_execution` decorator to maintain state across the pause/resume cycle. The callback pattern ensures no compute charges while waiting for ECS task completion. + +### Architecture Components + +- **Sync Lambda**: Orchestrates ECS tasks using Lambda durable functions SDK with polling pattern and durable waits +- **Callback Lambda**: Orchestrates ECS tasks using Lambda durable functions SDK with callback pattern +- **ECS Tasks**: Process work and send callbacks to Lambda using durable execution callback APIs +- **VPC and Networking**: Provides network connectivity for ECS tasks to pull Docker images and call AWS APIs +- **CloudWatch Logs**: Stores execution logs for Lambda functions and ECS tasks + +## Testing + +### Set Environment Variables + +```bash +export AWS_DEFAULT_REGION=us-east-1 +export STACK_NAME= + +# Get function names from CloudFormation outputs +export SYNC_FUNCTION=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`SyncLambdaFunctionArn`].OutputValue' \ + --output text | awk -F: '{print $NF}') + +export CALLBACK_FUNCTION=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`CallbackLambdaFunctionArn`].OutputValue' \ + --output text | awk -F: '{print $NF}') +``` + +### Test Synchronous Pattern + +```bash +# Invoke the sync function (must use qualified ARN with :$LATEST) +aws lambda invoke \ + --function-name $SYNC_FUNCTION:\$LATEST \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"message": "Hello from sync pattern", "processingTime": 10}' \ + response.json + +# Monitor Lambda logs +aws logs tail /aws/lambda/$SYNC_FUNCTION --follow + +# Monitor ECS task logs +aws logs tail /ecs/$STACK_NAME --follow +``` + +### Test Callback Pattern + +```bash +# Invoke the callback function (must use qualified ARN with :$LATEST) +aws lambda invoke \ + --function-name $CALLBACK_FUNCTION:\$LATEST \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"message": "Hello from callback pattern", "processingTime": 30}' \ + response.json + +# Monitor Lambda logs +aws logs tail /aws/lambda/$CALLBACK_FUNCTION --follow + +# Monitor ECS task logs +aws logs tail /ecs/$STACK_NAME --follow +``` + +Expected output: The Lambda function should complete and return the ECS task result. The logs should show the callback being received and the function resuming execution. + +## Cleanup + +1. Delete the stack + ```bash + sam delete + ``` +1. Confirm the stack has been deleted + ```bash + aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'$STACK_NAME')].StackStatus" + ``` + +---- +Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-ecs-durable-python-sam/example-pattern.json b/lambda-ecs-durable-python-sam/example-pattern.json new file mode 100644 index 000000000..51f7d21da --- /dev/null +++ b/lambda-ecs-durable-python-sam/example-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "AWS Lambda Durable Functions to Amazon ECS with Python", + "description": "Invoke ECS tasks from Lambda Durable Functions with automatic checkpointing, state management, and resilient execution patterns", + "language": "Python", + "level": "300", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates AWS Lambda Durable Functions invoking Amazon ECS tasks with resilient, long-running execution capabilities:", + "1. Durable Synchronous Pattern: Lambda uses checkpointed steps and durable waits to poll ECS task status. Can run for up to 1 year with automatic recovery from failures. No compute charges during wait periods.", + "2. Durable Callback Pattern: Lambda uses checkpointed steps to reliably initiate ECS tasks. Each step (create record, start task, update status) is automatically checkpointed for guaranteed execution.", + "The pattern uses the AWS Durable Execution SDK for Python, providing automatic state management, checkpoint-based recovery, and cost-effective long-running workflows. Includes inline Python code in ECS containers, VPC networking, and DynamoDB for callback tracking." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-ecs-python-sam", + "templateURL": "serverless-patterns/lambda-ecs-python-sam", + "projectFolder": "lambda-ecs-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Lambda Durable Functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Durable Execution SDK", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-execution-sdk.html" + }, + { + "text": "Run Amazon ECS or Fargate tasks", + "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs_run_task.html" + }, + { + "text": "Amazon ECS Task Definitions", + "link": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task_definitions.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete" + ] + }, + "authors": [ + { + "name": "Mian Tariq", + "image": "", + "bio": "Senior Delivery Consultant", + "linkedin": "" + } + ] +} diff --git a/lambda-ecs-durable-python-sam/src/Dockerfile b/lambda-ecs-durable-python-sam/src/Dockerfile new file mode 100644 index 000000000..730a1f347 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.13 + +# Copy requirements file +COPY requirements.txt ${LAMBDA_TASK_ROOT}/ + +# Install dependencies including durable SDK +RUN pip install -r requirements.txt + +# Copy function code +COPY sync_handler.py ${LAMBDA_TASK_ROOT}/ +COPY callback_handler.py ${LAMBDA_TASK_ROOT}/ + +# Default handler (will be overridden by template) +CMD [ "sync_handler.lambda_handler" ] diff --git a/lambda-ecs-durable-python-sam/src/callback_handler.py b/lambda-ecs-durable-python-sam/src/callback_handler.py new file mode 100644 index 000000000..39d223b73 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/callback_handler.py @@ -0,0 +1,114 @@ +import json +import boto3 +import os +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, +) + +ecs_client = boto3.client('ecs') + +def start_ecs_task_with_callback(cluster, task_definition, subnet1, subnet2, security_group, + callback_token, message, processing_time): + """ + Starts an ECS task and passes the callback token via environment variable. + The ECS task will call Lambda durable execution callback APIs when complete. + """ + print(f"[CALLBACK] Starting ECS task with callback token") + + response = ecs_client.run_task( + cluster=cluster, + taskDefinition=task_definition, + launchType='FARGATE', + networkConfiguration={ + 'awsvpcConfiguration': { + 'subnets': [subnet1, subnet2], + 'securityGroups': [security_group], + 'assignPublicIp': 'ENABLED' + } + }, + overrides={ + 'containerOverrides': [ + { + 'name': 'python-callback-container', + 'environment': [ + {'name': 'CALLBACK_TOKEN', 'value': callback_token}, + {'name': 'MESSAGE', 'value': message}, + {'name': 'PROCESSING_TIME', 'value': str(processing_time)} + ] + } + ] + } + ) + + if not response['tasks']: + raise Exception("Failed to start ECS task") + + task_arn = response['tasks'][0]['taskArn'] + print(f"[CALLBACK] Task started: {task_arn}") + + return task_arn + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Lambda durable function that invokes an ECS task and waits for callback. + + The ECS task receives a callback token and calls Lambda durable execution + callback APIs (SendDurableExecutionCallbackSuccess/Failure) when complete. + + This function pauses execution while waiting for the callback, with no + compute charges during the wait period. + """ + + # Get configuration from environment variables + cluster = os.environ['ECS_CLUSTER'] + task_definition = os.environ['TASK_DEFINITION'] + subnet1 = os.environ['SUBNET_1'] + subnet2 = os.environ['SUBNET_2'] + security_group = os.environ['SECURITY_GROUP'] + + # Get input parameters + message = event.get('message', 'No message provided') + processing_time = event.get('processingTime', 5) + + try: + # Create callback to get callback token + callback = context.create_callback() + + print(f"[CALLBACK] Created callback with token: {callback.callback_id[:20]}...") + + # Start ECS task with callback token (call directly, no context.step!) + task_arn = start_ecs_task_with_callback( + cluster, task_definition, subnet1, subnet2, security_group, + callback.callback_id, message, processing_time + ) + + print(f"[CALLBACK] Waiting for callback from ECS task...") + + # Wait for callback (pauses execution here, no compute charges) + result = callback.result() + + print(f"[CALLBACK] Received callback with result") + + # Return the result from the callback + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'success', + 'message': 'ECS task completed and sent callback', + 'taskArn': task_arn, + 'result': result + }) + } + + except Exception as e: + context.logger.error(f"[CALLBACK] Error: {str(e)}") + + return { + 'statusCode': 500, + 'body': json.dumps({ + 'status': 'error', + 'error': str(e) + }) + } diff --git a/lambda-ecs-durable-python-sam/src/requirements.txt b/lambda-ecs-durable-python-sam/src/requirements.txt new file mode 100644 index 000000000..46ba39952 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/requirements.txt @@ -0,0 +1,2 @@ +boto3 +aws-durable-execution-sdk-python diff --git a/lambda-ecs-durable-python-sam/src/sync_handler.py b/lambda-ecs-durable-python-sam/src/sync_handler.py new file mode 100644 index 000000000..8ba2739e6 --- /dev/null +++ b/lambda-ecs-durable-python-sam/src/sync_handler.py @@ -0,0 +1,153 @@ +import json +import boto3 +import os +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, + durable_step, +) +from aws_durable_execution_sdk_python.config import Duration + +ecs_client = boto3.client('ecs') + +@durable_step +def start_ecs_task(step_context, cluster, task_definition, subnet1, subnet2, security_group, message, processing_time): + """ + Durable step that starts an ECS task. + This step is checkpointed, so if interrupted, it won't re-execute. + """ + step_context.logger.info(f"[SYNC] Starting ECS task with message: {message}") + + response = ecs_client.run_task( + cluster=cluster, + taskDefinition=task_definition, + launchType='FARGATE', + networkConfiguration={ + 'awsvpcConfiguration': { + 'subnets': [subnet1, subnet2], + 'securityGroups': [security_group], + 'assignPublicIp': 'ENABLED' + } + }, + overrides={ + 'containerOverrides': [ + { + 'name': 'python-sync-container', + 'environment': [ + {'name': 'MESSAGE', 'value': message}, + {'name': 'PROCESSING_TIME', 'value': str(processing_time)} + ] + } + ] + } + ) + + if not response['tasks']: + raise Exception("Failed to start ECS task") + + task_arn = response['tasks'][0]['taskArn'] + step_context.logger.info(f"[SYNC] Task started: {task_arn}") + + return task_arn + +@durable_step +def check_task_status(step_context, cluster, task_arn): + """ + Durable step that checks ECS task status. + This step is checkpointed and can be retried if it fails. + """ + step_context.logger.info(f"[SYNC] Checking task status: {task_arn}") + + describe_response = ecs_client.describe_tasks( + cluster=cluster, + tasks=[task_arn] + ) + + if not describe_response['tasks']: + raise Exception(f"Task not found: {task_arn}") + + task = describe_response['tasks'][0] + last_status = task['lastStatus'] + + step_context.logger.info(f"[SYNC] Task status: {last_status}") + + return { + 'status': last_status, + 'task': task + } + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Lambda Durable Function that invokes an ECS task and waits for completion. + Uses the Durable Execution SDK for automatic checkpointing and replay. + + This function can run for up to 1 year, with automatic state management + and recovery from failures. + """ + + # Get configuration from environment variables + cluster = os.environ['ECS_CLUSTER'] + task_definition = os.environ['TASK_DEFINITION'] + subnet1 = os.environ['SUBNET_1'] + subnet2 = os.environ['SUBNET_2'] + security_group = os.environ['SECURITY_GROUP'] + + # Get input parameters + message = event.get('message', 'No message provided') + processing_time = event.get('processingTime', 5) + + try: + # Step 1: Start ECS task (checkpointed) + task_arn = context.step(start_ecs_task( + cluster, task_definition, subnet1, subnet2, + security_group, message, processing_time + )) + + # Poll for task completion using durable waits + max_attempts = 60 # 5 minutes max (60 * 5 seconds) + poll_interval = 5 # Check every 5 seconds + + for attempt in range(max_attempts): + # Wait before checking status (no compute charges during wait) + context.wait(Duration.from_seconds(poll_interval)) + + # Step 2: Check task status (checkpointed) + status_result = context.step(check_task_status(cluster, task_arn)) + + if status_result['status'] == 'STOPPED': + # Task completed + task = status_result['task'] + stop_code = task.get('stopCode', 'Unknown') + + if stop_code == 'EssentialContainerExited': + exit_code = task['containers'][0].get('exitCode', 1) + + if exit_code == 0: + context.logger.info(f"[SYNC] Task completed successfully") + return { + 'statusCode': 200, + 'body': json.dumps({ + 'status': 'success', + 'message': f'Processed: {message}', + 'processingTime': processing_time, + 'taskArn': task_arn + }) + } + else: + raise Exception(f"Task failed with exit code: {exit_code}") + else: + raise Exception(f"Task stopped unexpectedly: {stop_code}") + + # Timeout + raise Exception(f"Task did not complete within {max_attempts * poll_interval} seconds") + + except Exception as e: + context.logger.error(f"[SYNC] Error: {str(e)}") + return { + 'statusCode': 500, + 'body': json.dumps({ + 'status': 'error', + 'error': str(e) + }) + } diff --git a/lambda-ecs-durable-python-sam/template.yaml b/lambda-ecs-durable-python-sam/template.yaml new file mode 100644 index 000000000..d958d9faa --- /dev/null +++ b/lambda-ecs-durable-python-sam/template.yaml @@ -0,0 +1,398 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Lambda Durable Functions to ECS with Python - Demonstrates durable execution patterns with ECS tasks + +Parameters: + VpcCIDR: + Type: String + Default: 10.0.0.0/16 + Description: CIDR block for VPC + +Resources: + # VPC and Networking + VPC: + Type: AWS::EC2::VPC + Properties: + CidrBlock: !Ref VpcCIDR + EnableDnsHostnames: true + EnableDnsSupport: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-vpc + + PublicSubnet1: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + CidrBlock: 10.0.1.0/24 + AvailabilityZone: !Select [0, !GetAZs ''] + MapPublicIpOnLaunch: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-subnet-1 + + PublicSubnet2: + Type: AWS::EC2::Subnet + Properties: + VpcId: !Ref VPC + CidrBlock: 10.0.2.0/24 + AvailabilityZone: !Select [1, !GetAZs ''] + MapPublicIpOnLaunch: true + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-subnet-2 + + InternetGateway: + Type: AWS::EC2::InternetGateway + Properties: + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-igw + + AttachGateway: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref VPC + InternetGatewayId: !Ref InternetGateway + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref VPC + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-public-rt + + PublicRoute: + Type: AWS::EC2::Route + DependsOn: AttachGateway + Properties: + RouteTableId: !Ref PublicRouteTable + DestinationCidrBlock: 0.0.0.0/0 + GatewayId: !Ref InternetGateway + + SubnetRouteTableAssociation1: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet1 + RouteTableId: !Ref PublicRouteTable + + SubnetRouteTableAssociation2: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnet2 + RouteTableId: !Ref PublicRouteTable + + # Security Group + ECSSecurityGroup: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Security group for ECS tasks + VpcId: !Ref VPC + SecurityGroupEgress: + - IpProtocol: -1 + CidrIp: 0.0.0.0/0 + Tags: + - Key: Name + Value: !Sub ${AWS::StackName}-ecs-sg + + # ECS Cluster + ECSCluster: + Type: AWS::ECS::Cluster + Properties: + ClusterName: !Sub ${AWS::StackName}-cluster + ClusterSettings: + - Name: containerInsights + Value: enabled + + # CloudWatch Log Group + ECSLogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub /ecs/${AWS::StackName} + RetentionInDays: 7 + + # ECS Task Execution Role + ECSTaskExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: ecs-tasks.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy + + # ECS Task Role (for callback pattern) + ECSTaskRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: ecs-tasks.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: LambdaCallbackPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:SendDurableExecutionCallbackSuccess + - lambda:SendDurableExecutionCallbackFailure + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-callback-function:$LATEST/durable-execution/*' + + # ECS Task Definition for Sync Pattern + SyncTaskDefinition: + Type: AWS::ECS::TaskDefinition + Properties: + Family: !Sub ${AWS::StackName}-sync-task + NetworkMode: awsvpc + RequiresCompatibilities: + - FARGATE + Cpu: '256' + Memory: '512' + ExecutionRoleArn: !GetAtt ECSTaskExecutionRole.Arn + ContainerDefinitions: + - Name: python-sync-container + Image: public.ecr.aws/docker/library/python:3.11-slim + Essential: true + Command: + - /bin/sh + - -c + - | + pip install --quiet boto3 && python3 << 'EOF' + import os + import time + import json + from datetime import datetime + + def main(): + message = os.environ.get('MESSAGE', 'No message provided') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + print(f"[SYNC] Starting processing: {message}") + print(f"[SYNC] Will process for {processing_time} seconds") + + # Simulate processing + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time, + "timestamp": datetime.utcnow().isoformat() + "Z" + } + + print(f"[SYNC] Completed: {json.dumps(result)}") + return result + + if __name__ == "__main__": + main() + EOF + LogConfiguration: + LogDriver: awslogs + Options: + awslogs-group: !Ref ECSLogGroup + awslogs-region: !Ref AWS::Region + awslogs-stream-prefix: sync + + # ECS Task Definition for Callback Pattern + CallbackTaskDefinition: + Type: AWS::ECS::TaskDefinition + Properties: + Family: !Sub ${AWS::StackName}-callback-task + NetworkMode: awsvpc + RequiresCompatibilities: + - FARGATE + Cpu: '256' + Memory: '512' + ExecutionRoleArn: !GetAtt ECSTaskExecutionRole.Arn + TaskRoleArn: !GetAtt ECSTaskRole.Arn + ContainerDefinitions: + - Name: python-callback-container + Image: public.ecr.aws/docker/library/python:3.11-slim + Essential: true + Command: + - /bin/sh + - -c + - | + pip install --quiet boto3 && python3 << 'EOF' + import os + import boto3 + import json + import time + from datetime import datetime + + def main(): + callback_token = os.environ.get('CALLBACK_TOKEN') + message = os.environ.get('MESSAGE', 'No message provided') + processing_time = int(os.environ.get('PROCESSING_TIME', '5')) + + if not callback_token: + print("[CALLBACK] ERROR: No callback token provided!") + return + + lambda_client = boto3.client('lambda') + + try: + print(f"[CALLBACK] Starting processing: {message}") + print(f"[CALLBACK] Callback token: {callback_token[:20]}...") + print(f"[CALLBACK] Will process for {processing_time} seconds") + + # Simulate processing + time.sleep(processing_time) + + result = { + "status": "success", + "message": f"Processed: {message}", + "processingTime": processing_time, + "timestamp": datetime.utcnow().isoformat() + "Z" + } + + print(f"[CALLBACK] Sending success callback to Lambda") + lambda_client.send_durable_execution_callback_success( + CallbackId=callback_token, + Result=json.dumps(result) + ) + print(f"[CALLBACK] Success callback sent!") + + except Exception as e: + print(f"[CALLBACK] ERROR: {str(e)}") + try: + lambda_client.send_durable_execution_callback_failure( + CallbackId=callback_token, + Error={'ErrorType': 'ProcessingError', 'ErrorMessage': str(e)} + ) + print(f"[CALLBACK] Failure callback sent") + except Exception as callback_error: + print(f"[CALLBACK] Failed to send callback: {str(callback_error)}") + + if __name__ == "__main__": + main() + EOF + LogConfiguration: + LogDriver: awslogs + Options: + awslogs-group: !Ref ECSLogGroup + awslogs-region: !Ref AWS::Region + awslogs-stream-prefix: callback + + # Lambda Function for Sync Pattern (Durable) + SyncLambdaFunction: + Type: AWS::Serverless::Function + Metadata: + Dockerfile: Dockerfile + DockerContext: ./src + DockerTag: python3.13-v1 + Properties: + FunctionName: !Sub ${AWS::StackName}-sync-function + PackageType: Image + ImageConfig: + Command: + - sync_handler.lambda_handler + Timeout: 900 # 15 minutes (max Lambda timeout) + MemorySize: 256 + DurableConfig: + ExecutionTimeout: 3600 # 1 hour for durable execution + Environment: + Variables: + ECS_CLUSTER: !Ref ECSCluster + TASK_DEFINITION: !Ref SyncTaskDefinition + SUBNET_1: !Ref PublicSubnet1 + SUBNET_2: !Ref PublicSubnet2 + SECURITY_GROUP: !Ref ECSSecurityGroup + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + - ecs:DescribeTasks + Resource: '*' + - Effect: Allow + Action: + - iam:PassRole + Resource: + - !GetAtt ECSTaskExecutionRole.Arn + - !GetAtt ECSTaskRole.Arn + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: '*' + + # Lambda Function for Callback Pattern (Durable) + CallbackLambdaFunction: + Type: AWS::Serverless::Function + Metadata: + Dockerfile: Dockerfile + DockerContext: ./src + DockerTag: python3.13-v1 + Properties: + FunctionName: !Sub ${AWS::StackName}-callback-function + PackageType: Image + ImageConfig: + Command: + - callback_handler.lambda_handler + Timeout: 900 # 15 minutes (max Lambda timeout) + MemorySize: 256 + DurableConfig: + ExecutionTimeout: 3600 # 1 hour for durable execution + Environment: + Variables: + ECS_CLUSTER: !Ref ECSCluster + TASK_DEFINITION: !Ref CallbackTaskDefinition + SUBNET_1: !Ref PublicSubnet1 + SUBNET_2: !Ref PublicSubnet2 + SECURITY_GROUP: !Ref ECSSecurityGroup + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + Resource: '*' + - Effect: Allow + Action: + - iam:PassRole + Resource: + - !GetAtt ECSTaskExecutionRole.Arn + - !GetAtt ECSTaskRole.Arn + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-callback-function' + +Outputs: + SyncLambdaFunctionArn: + Description: ARN of the Synchronous Pattern Lambda Function + Value: !GetAtt SyncLambdaFunction.Arn + + CallbackLambdaFunctionArn: + Description: ARN of the Callback Pattern Lambda Function + Value: !GetAtt CallbackLambdaFunction.Arn + + ECSClusterName: + Description: Name of the ECS Cluster + Value: !Ref ECSCluster + + SyncTaskDefinitionArn: + Description: ARN of the Sync Task Definition + Value: !Ref SyncTaskDefinition + + CallbackTaskDefinitionArn: + Description: ARN of the Callback Task Definition + Value: !Ref CallbackTaskDefinition + + LogGroupName: + Description: CloudWatch Log Group for ECS tasks + Value: !Ref ECSLogGroup