diff --git a/README.md b/README.md index 90c25a3..d1a87a8 100755 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ By leveraging this framework, you can build a cost-effective pipeline to run ad * Cloudwatch log access (logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents) * X-Ray write access (xray:PutTraceSegments, xray:PutTelemetryRecords) -Check policy.json for a sample that you can use or extend. +Check cf_template.yaml that you can extend as needed. * To execute the driver locally, make sure that you configure your AWS profile with access to: * [S3](http://docs.aws.amazon.com/AmazonS3/latest/dev/example-policies-s3.html) @@ -33,62 +33,39 @@ Check policy.json for a sample that you can use or extend. To run the example, you must have the AWS CLI set up. Your credentials must have access to create and invoke Lambda and access to list, read, and write to a S3 bucket. -1. Create your S3 bucket to store the intermediaries and result -(remember to use your own bucket name due to S3 namespace) +1. Start CloudFormation console and create new stack using cf_template.yaml. CloudFormation will create: +* S3 bucket for the results, +* biglambda_role IAM role for AWS Lambda execution with appropriate inline policy, +* SSM Parameter Store parameters used by the Lambda functions. +* (Optionally) AWS Cloud9 IDE environment - $ aws s3 mb s3://biglambda-s3-bucket +2. [Run AWS X-Ray Daemon locally](https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html), otherwise you will not be able to see traces from the local driver in AWS X-Ray console. However, traces from Reducer Coordinator Lambda functions will be present. -2. Update the policy.json with your S3 bucket name - - $ sed -i 's/s3:::MY-S3-BUCKET/s3:::biglambda-s3-bucket/' policy.json +3. Run the driver + + $ python driver.py -3. Create the IAM role with respective policy +### AWS Cloud9 IDE +You can select AWS Cloud9 IDE instance type while creating CloudFormation stack. By default it is set to "None" (does not create IDE). After CloudFormation stack with instance type selected is created check Outputs section of the stack description for Cloud9 IDE URL. Code from this Git repository will be pulled to that instance already. You will need to install Boto3 and X-Ray Python SDK by running folowing commands in the IDE Bash tab: - $ python create-biglambda-role.py + $ sudo python -m pip install boto3 + $ sudo python -m pip install aws-xray-sdk -4. Use the output ARN from the script. Set the serverless_mapreduce_role environment variable: +Navigate to the code location - $ export serverless_mapreduce_role=arn:aws:iam::MY-ACCOUNT-ID:role/biglambda_role + $ cd lambda-refarch-mapreduce/src/python -5. Make edits to driverconfig.json and verify +Run the driver - $ cat driverconfig.json + $ python driver.py -6. [Run AWS X-Ray Daemon locally](https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html), otherwise you will not be able to see traces from the local driver in AWS X-Ray console. However, traces from Reducer Coordinator Lambda functions will be present. +If you'd like to run code from IDE directly make sure to update current working directory (CWD) in the default Runner or create new [Runner](https://docs.aws.amazon.com/cloud9/latest/user-guide/build-run-debug.html) -7. Run the driver - - $ python driver.py +Note that deleting CloudFormation stack will also delete Cloud9 IDE created as part of it. -### Modifying the Job (driverconfig.json) +### Modifying the Job -For the jobBucket field, enter an S3 bucket in your account that you wish to use for the example. Make changes to the other fields if you have different source data, or if you have renamed the files. - -``` - -{ - "bucket": "big-data-benchmark", - "prefix": "pavlo/text/1node/uservisits/", - "jobBucket": "biglambda-s3-bucket", - "concurrentLambdas": 100, - "mapper": { - "name": "mapper.py", - "handler": "mapper.lambda_handler", - "zip": "mapper.zip" - }, - "reducer":{ - "name": "reducer.py", - "handler": "reducer.lambda_handler", - "zip": "reducer.zip" - }, - "reducerCoordinator":{ - "name": "reducerCoordinator.py", - "handler": "reducerCoordinator.lambda_handler", - "zip": "reducerCoordinator.zip" - }, -} - -``` +You can modify cf_template.yaml and update CloudFormation stack. ### Outputs @@ -111,10 +88,8 @@ smallya$ head –n 3 result To remove all resources created by this example, do the following: 1. Delete all objects from the S3 bucket listed in `jobBucket` created by the job. -1. Delete the Cloudwatch log groups for each of the Lambda functions created by the job. -1. Delete the created IAM role - - $ python delete-biglambda-role.py +2. Delete CloudFormation stack created for the job +3. Delete the Cloudwatch log groups for each of the Lambda functions created by the job. ## Languages * Python 2.7 (active development) @@ -179,4 +154,4 @@ Serverless MapReduce Cost: ``` ## License -This reference architecture sample is licensed under the Amazon Software License. +This reference architecture sample is licensed under the Amazon Software License. diff --git a/cf_template.yaml b/cf_template.yaml new file mode 100644 index 0000000..f301a00 --- /dev/null +++ b/cf_template.yaml @@ -0,0 +1,289 @@ +AWSTemplateFormatVersion: 2010-09-09 +Resources: + SSMJobID: + Type: 'AWS::SSM::Parameter' + Properties: + Value: bl-release + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'jobId' + SSMMapCount: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 0 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'mapCount' + SSMReducerFunction: + Type: 'AWS::SSM::Parameter' + Properties: + Value: ' ' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'reducerFunction' + SSMBotoMaxConnections: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 1000 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'boto_max_connections' + SSMBucket: + Type: 'AWS::SSM::Parameter' + Properties: + Value: big-data-benchmark + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'bucket' + SSMBucketPrefix: + Type: 'AWS::SSM::Parameter' + Properties: + Value: pavlo/text/1node/uservisits/ + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'prefix' + SSMJobBucket: + Type: 'AWS::SSM::Parameter' + Properties: + Value: !Ref JobBucket + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'jobBucket' + SSMRegion: + Type: 'AWS::SSM::Parameter' + Properties: + Value: !Ref Region + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'region' + SSMLambdaMemory: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 1536 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'lambdaMemory' + SSMConcurrentLambdas: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 1000 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'concurrentLambdas' + SSMMapper: + Type: 'AWS::SSM::Parameter' + Properties: + Value: '{ "name": "mapper.py","handler": "mapper.lambda_handler","zip": "mapper.zip"}' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'mapper' + SSMReducer: + Type: 'AWS::SSM::Parameter' + Properties: + Value: '{ "name": "reducer.py","handler": "reducer.lambda_handler","zip": "reducer.zip"}' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'reducer' + SSMReducerCoordinator: + Type: 'AWS::SSM::Parameter' + Properties: + Value: '{ "name": "reducerCoordinator.py","handler": "reducerCoordinator.lambda_handler","zip": "reducerCoordinator.zip"}' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'reducerCoordinator' + SSMLambdaReadTimeout: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 300 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'lambdaReadTimeout' + SSMLambdaExecutionRole: + Type: 'AWS::SSM::Parameter' + Properties: + Value: !GetAtt BigLambdaRole.Arn + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'lambdaExecutionRole' + BigLambdaRole: + Type: 'AWS::IAM::Role' + Properties: + RoleName: biglambda_role + AssumeRolePolicyDocument: + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: + - sts:AssumeRole + Policies: + - + PolicyName: BigLambdaPolicy + PolicyDocument: + Version: "2012-10-17" + Statement: + - + Action: + - 'lambda:AddPermission' + - 'lambda:CreateEventSourceMapping' + - 'lambda:CreateFunction' + - 'lambda:DeleteEventSourceMapping' + - 'lambda:DeleteFunction' + - 'lambda:GetEventSourceMapping' + - 'lambda:InvokeAsync' + - 'lambda:InvokeFunction' + - 'lambda:ListEventSourceMappings' + - 'lambda:RemovePermission' + - 'lambda:UpdateEventSourceMapping' + - 'lambda:UpdateFunctionCode' + - 'lambda:UpdateFunctionConfiguration' + Effect: Allow + Resource: "*" + - + Action: + - 'logs:CreateLogGroup' + - 'logs:CreateLogStream' + - 'logs:DeleteLogGroup' + - 'logs:DeleteLogStream' + - 'logs:GetLogEvents' + - 'logs:PutLogEvents' + Effect: Allow + Resource: "*" + - + Action: + - 'xray:PutTraceSegments' + - 'xray:PutTelemetryRecords' + Effect: Allow + Resource: "*" + - + Action: + - 'ssm:DescribeParameters' + Effect: Allow + Resource: "*" + - + Action: + - 's3:GetObject' + - 's3:ListBucket' + Effect: Allow + Resource: "*" + - + Action: + - 's3:DeleteObject' + - 's3:DeleteObjectVersion' + - 's3:GetBucketAcl' + - 's3:GetBucketLocation' + - 's3:GetBucketLogging' + - 's3:GetBucketNotification' + - 's3:GetBucketPolicy' + - 's3:GetObjectAcl' + - 's3:GetObjectVersion' + - 's3:GetObjectVersionAcl' + - 's3:ListAllMyBuckets' + - 's3:PutBucketNotification' + - 's3:PutObject' + - 's3:PutObjectAcl' + - 's3:PutObjectVersionAcl' + Effect: Allow + Resource: !Join + - '' + - - 'arn:aws:s3:::' + - !Ref JobBucket + - '/*' + - + Action: + - 'ssm:GetParameters' + - 'ssm:GetParametersByPath' + - 'ssm:GetParameter' + Effect: Allow + Resource: !Join + - '' + - - 'arn:aws:ssm:' + - !Ref Region + - ':' + - !Ref AWS::AccountId + - ':parameter' + - !Ref SSMPrefix + - '/*' + S3JobBucket: + Type: 'AWS::S3::Bucket' + Properties: + BucketName: !Ref JobBucket + Cloud9IDE: + Condition: CreateCloud9IDE + Type: AWS::Cloud9::EnvironmentEC2 + Properties: + Repositories: + - RepositoryUrl: https://github.com/giedri/lambda-refarch-mapreduce.git + PathComponent: lambda-refarch-mapreduce + Description: Lmabda MapReduce Cloud9 IDE + InstanceType: t2.medium + AutomaticStopTimeMinutes: 30 + Name: + Ref: AWS::StackName +Conditions: + CreateCloud9IDE: !Not [!Equals [ !Ref Cloud9InstanceType, None ]] +Parameters: + Cloud9InstanceType: + Description: Select Cloud9 IDE instance type or None if you do not want IDE to be created + Default: None + Type: String + AllowedValues: + - None + - t2.micro + - t2.small + - t2.large + ConstraintDescription: Must specify instance type or select None + SSMPrefix: + Type: String + Default: /biglambda + Description: Enter SSM parameter prefix. Must start with / (default is /biglambda). You will need to update SSM_PATH constant in lambdautils.py file with the value entered here. + JobBucket: + Type: String + Default: ENTER YOUR BUCKET HERE + Description: S3 bucket for results + Region: + Type: String + Default: us-west-2 + Description: AWS Region to use for AWS Lambda +Outputs: + Cloud9URL: + Condition: CreateCloud9IDE + Value: + Fn::Join: + - '' + - - https:// + - Ref: Region + - .console.aws.amazon.com/cloud9/home/environments/ + - Ref: Cloud9IDE + Description: Lambda Map/Reduce Cloud9 environment + diff --git a/create-biglambda-role.py b/create-biglambda-role.py deleted file mode 100644 index 846f222..0000000 --- a/create-biglambda-role.py +++ /dev/null @@ -1,37 +0,0 @@ -import boto3,json,botocore -client = boto3.client('iam') - -trust_role = { - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "", - "Effect": "Allow", - "Principal": { - "Service": "lambda.amazonaws.com" - }, - "Action": "sts:AssumeRole" - } - ] -} - -rn='biglambda_role' -rp='biglambda_policy' - -try: - response = client.create_role(RoleName=rn,AssumeRolePolicyDocument=json.dumps(trust_role)) - print response['Role']['Arn'] - print "Success: done creating role" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) - -try: - with open('policy.json') as json_data: - response = client.put_role_policy(RoleName=rn,PolicyName=rp, - PolicyDocument=json.dumps(json.load(json_data)) - ) - print "Success: done adding inline policy to role" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) - - diff --git a/delete-biglambda-role.py b/delete-biglambda-role.py deleted file mode 100644 index 3129879..0000000 --- a/delete-biglambda-role.py +++ /dev/null @@ -1,17 +0,0 @@ -import boto3,botocore -client = boto3.client('iam') - -rn = 'biglambda_role' -rp = 'biglambda_policy' - -try: - response = client.delete_role_policy(RoleName=rn,PolicyName=rp) - print "Success: done deleting role policy" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) - -try: - response = client.delete_role(RoleName=rn) - print "Success: done deleting role" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) diff --git a/policy.json b/policy.json deleted file mode 100755 index 598949d..0000000 --- a/policy.json +++ /dev/null @@ -1,94 +0,0 @@ -{ - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "Stmt1475879631000", - "Effect": "Allow", - "Action": [ - "lambda:AddPermission", - "lambda:CreateEventSourceMapping", - "lambda:CreateFunction", - "lambda:DeleteEventSourceMapping", - "lambda:DeleteFunction", - "lambda:GetEventSourceMapping", - "lambda:InvokeAsync", - "lambda:InvokeFunction", - "lambda:ListEventSourceMappings", - "lambda:RemovePermission", - "lambda:UpdateEventSourceMapping", - "lambda:UpdateFunctionCode", - "lambda:UpdateFunctionConfiguration" - ], - "Resource": [ - "*" - ] - }, - - { - "Sid": "Stmt1475879730000", - "Effect": "Allow", - "Action": [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:DeleteLogGroup", - "logs:DeleteLogStream", - "logs:GetLogEvents", - "logs:PutLogEvents" - ], - "Resource": [ - "*" - ] - }, - -{ - "Sid": "Stmt1475879783000", - "Effect": "Allow", - "Action": [ - "s3:DeleteObject", - "s3:DeleteObjectVersion", - "s3:GetBucketAcl", - "s3:GetBucketLocation", - "s3:GetBucketLogging", - "s3:GetBucketNotification", - "s3:GetBucketPolicy", - "s3:GetObjectAcl", - "s3:GetObjectVersion", - "s3:GetObjectVersionAcl", - "s3:ListAllMyBuckets", - "s3:PutBucketNotification", - "s3:PutObject", - "s3:PutObjectAcl", - "s3:PutObjectVersionAcl" - ], - "Resource": [ - "arn:aws:s3:::MY-S3-BUCKET/*" - ] - }, - - { - "Sid": "StmXrayAllow", - "Effect": "Allow", - "Action": [ - "xray:PutTraceSegments", - "xray:PutTelemetryRecords" - ], - "Resource": [ - "*" - ] - }, - - { - "Effect": "Allow", - "Action": [ - "s3:GetObject", - "s3:ListBucket" - ], - "Resource": [ - "*" - ] - } - - - - ] -} diff --git a/src/python/driver.py b/src/python/driver.py index 9e3509e..543efc6 100755 --- a/src/python/driver.py +++ b/src/python/driver.py @@ -40,13 +40,14 @@ logging.basicConfig(level='WARNING') logging.getLogger('aws_xray_sdk').setLevel(logging.ERROR) # collect all tracing samples -SAMPLING_RULES = {"version": 1, "default": {"fixed_target": 1, "rate": 1}} +SAMPLING_RULES={"version": 1, "default": {"fixed_target": 1,"rate": 1}} xray_recorder.configure(sampling_rules=SAMPLING_RULES) xray_recorder.begin_segment('Map Reduce Driver') # create an S3 session s3 = boto3.resource('s3') s3_client = boto3.client('s3') +ssm_client = boto3.client('ssm') JOB_INFO = 'jobinfo.json' @@ -54,7 +55,7 @@ @xray_recorder.capture('zipLambda') def zipLambda(fname, zipname): # faster to zip with shell exec - subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob(JOB_INFO) + + subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob(JOB_INFO) + glob.glob("lambdautils.py")) @xray_recorder.capture('write_to_s3') @@ -62,7 +63,7 @@ def write_to_s3(bucket, key, data, metadata): s3.Bucket(bucket).put_object(Key=key, Body=data, Metadata=metadata) @xray_recorder.capture('write_job_config') -def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler): +def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler, lambdaMemory, concurrent_lambdas): fname = "jobinfo.json"; with open(fname, 'w') as f: data = json.dumps({ @@ -70,39 +71,47 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler): "jobBucket" : job_bucket, "mapCount": n_mappers, "reducerFunction": r_func, - "reducerHandler": r_handler + "reducerHandler": r_handler, + "lambdaMemory": lambdaMemory, + "concurrentLambdas": concurrent_lambdas }, indent=4); f.write(data) ######### MAIN ############# -## JOB ID -job_id = "bl-release" - # Config -config = json.loads(open('driverconfig.json', 'r').read()) +config = lambdautils.load_config() # 1. Get all keys to be processed xray_recorder.begin_subsegment('Get all keys to be processed') + # init +job_id = config["jobId"] bucket = config["bucket"] job_bucket = config["jobBucket"] region = config["region"] -lambda_memory = config["lambdaMemory"] -concurrent_lambdas = config["concurrentLambdas"] -lambda_read_timeout = config["lambda_read_timeout"] +lambda_memory = int(config["lambdaMemory"]) +concurrent_lambdas = int(config["concurrentLambdas"]) +mapper_config = json.loads(config["mapper"]) +reducer_config = json.loads(config["reducer"]) +reducerCoordinator_config = json.loads(config["reducerCoordinator"]) +lambda_env = {'Variables': {'ssmPath': config['ssmPath']}} +lambda_read_timeout = int(config["lambdaReadTimeout"]) +ssm_path = config["ssmPath"] +lambda_execution_role = config["lambdaExecutionRole"] +boto_max_connections = int(config["boto_max_connections"]) # Setting longer timeout for reading lambda results and larger connections pool -lambda_config = Config(read_timeout=lambda_read_timeout, max_pool_connections=50) -lambda_client = boto3.client('lambda', config=lambda_config) +lambda_config=Config(read_timeout=lambda_read_timeout, max_pool_connections=boto_max_connections) +lambda_client = boto3.client('lambda',config=lambda_config) # Fetch all the keys that match the prefix all_keys = [] for obj in s3.Bucket(bucket).objects.filter(Prefix=config["prefix"]).all(): all_keys.append(obj) -bsize = lambdautils.compute_batch_size(all_keys, lambda_memory) +bsize = lambdautils.compute_batch_size(all_keys, lambda_memory, concurrent_lambdas) batches = lambdautils.batch_creator(all_keys, bsize) n_mappers = len(batches) document = xray_recorder.current_subsegment() @@ -120,31 +129,31 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler): rc_lambda_name = L_PREFIX + "-rc-" + job_id; # write job config -write_job_config(job_id, job_bucket, n_mappers, reducer_lambda_name, config["reducer"]["handler"]); +write_job_config(job_id, job_bucket, n_mappers, reducer_lambda_name, reducer_config["handler"], lambda_memory, concurrent_lambdas) -zipLambda(config["mapper"]["name"], config["mapper"]["zip"]) -zipLambda(config["reducer"]["name"], config["reducer"]["zip"]) -zipLambda(config["reducerCoordinator"]["name"], config["reducerCoordinator"]["zip"]) +zipLambda(mapper_config["name"], mapper_config["zip"]) +zipLambda(reducer_config["name"], reducer_config["zip"]) +zipLambda(reducerCoordinator_config["name"], reducerCoordinator_config["zip"]) xray_recorder.end_subsegment() #Prepare Lambda functions # mapper xray_recorder.begin_subsegment('Create mapper Lambda function') -l_mapper = lambdautils.LambdaManager(lambda_client, s3_client, region, config["mapper"]["zip"], job_id, - mapper_lambda_name, config["mapper"]["handler"]) +l_mapper = lambdautils.LambdaManager(lambda_client, s3_client, region, mapper_config["zip"], job_id, + mapper_lambda_name, mapper_config["handler"], lambda_execution_role, lambda_memory) l_mapper.update_code_or_create_on_noexist() xray_recorder.end_subsegment() #Create mapper Lambda function # Reducer func xray_recorder.begin_subsegment('Create reducer Lambda function') -l_reducer = lambdautils.LambdaManager(lambda_client, s3_client, region, config["reducer"]["zip"], job_id, - reducer_lambda_name, config["reducer"]["handler"]) +l_reducer = lambdautils.LambdaManager(lambda_client, s3_client, region, reducer_config["zip"], job_id, + reducer_lambda_name, reducer_config["handler"], lambda_execution_role, lambda_memory) l_reducer.update_code_or_create_on_noexist() xray_recorder.end_subsegment() #Create reducer Lambda function # Coordinator xray_recorder.begin_subsegment('Create reducer coordinator Lambda function') -l_rc = lambdautils.LambdaManager(lambda_client, s3_client, region, config["reducerCoordinator"]["zip"], job_id, - rc_lambda_name, config["reducerCoordinator"]["handler"]) +l_rc = lambdautils.LambdaManager(lambda_client, s3_client, region, reducerCoordinator_config["zip"], job_id, + rc_lambda_name, reducerCoordinator_config["handler"], lambda_execution_role, lambda_memory) l_rc.update_code_or_create_on_noexist() # Add permission to the coordinator @@ -162,7 +171,7 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler): "totalS3Files": len(all_keys), "startTime": time.time() }) -xray_recorder.current_subsegment().put_metadata("Job data: ", data, "Write job data to S3") +xray_recorder.current_subsegment().put_metadata("Job data: ", data, "Write job data to S3"); write_to_s3(job_bucket, j_key, data, {}) xray_recorder.end_subsegment() #Write job data to S3 @@ -177,11 +186,8 @@ def invoke_lambda(batches, m_id): ''' lambda invoke function ''' - - #batch = [k['Key'] for k in batches[m_id-1]] batch = [k.key for k in batches[m_id-1]] - xray_recorder.current_segment().put_annotation("batch_for_mapper_"+str(m_id), str(batch)) - #print "invoking", m_id, len(batch) + xray_recorder.current_segment().put_annotation("batch_for_mapper_"+str(m_id), str(batch)); resp = lambda_client.invoke( FunctionName = mapper_lambda_name, InvocationType = 'RequestResponse', @@ -209,7 +215,7 @@ def invoke_lambda(batches, m_id): nm = min(concurrent_lambdas, n_mappers) results = pool.map(invoke_lambda_partial, Ids[mappers_executed: mappers_executed + nm]) mappers_executed += nm - xray_recorder.current_subsegment().put_metadata("Mapper lambdas executed: ", mappers_executed, "Invoke mappers") + xray_recorder.current_subsegment().put_metadata("Mapper lambdas executed: ", mappers_executed, "Invoke mappers"); pool.close() pool.join() diff --git a/src/python/driverconfig.json b/src/python/driverconfig.json deleted file mode 100644 index e9ac6ba..0000000 --- a/src/python/driverconfig.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "bucket": "big-data-benchmark", - "prefix": "pavlo/text/1node/uservisits/", - "jobBucket": "smallya-useast-1", - "region": "us-east-1", - "lambdaMemory": 1536, - "concurrentLambdas": 100, - "lambda_read_timeout": 300, - "mapper": { - "name": "mapper.py", - "handler": "mapper.lambda_handler", - "zip": "mapper.zip" - }, - "reducer":{ - "name": "reducer.py", - "handler": "reducer.lambda_handler", - "zip": "reducer.zip" - }, - "reducerCoordinator":{ - "name": "reducerCoordinator.py", - "handler": "reducerCoordinator.lambda_handler", - "zip": "reducerCoordinator.zip" - } -} diff --git a/src/python/jobinfo.json b/src/python/jobinfo.json deleted file mode 100644 index b49dd7b..0000000 --- a/src/python/jobinfo.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "jobBucket": "smallya-useast-1", - "mapCount": 29, - "reducerFunction": "BL-reducer-bl-release", - "reducerHandler": "reducer.lambda_handler", - "jobId": "bl-release" -} \ No newline at end of file diff --git a/src/python/lambdautils.py b/src/python/lambdautils.py index 76a59e9..314ea04 100755 --- a/src/python/lambdautils.py +++ b/src/python/lambdautils.py @@ -16,8 +16,10 @@ import botocore import os +SSM_PATH = '/biglambda/' + class LambdaManager(object): - def __init__ (self, l, s3, region, codepath, job_id, fname, handler, lmem=1536): + def __init__ (self, l, s3, region, codepath, job_id, fname, handler, role, lmem=1536): self.awslambda = l; self.region = "us-east-1" if region is None else region self.s3 = s3 @@ -25,7 +27,7 @@ def __init__ (self, l, s3, region, codepath, job_id, fname, handler, lmem=1536): self.job_id = job_id self.function_name = fname self.handler = handler - self.role = os.environ.get('serverless_mapreduce_role') + self.role = role self.memory = lmem self.timeout = 300 self.function_arn = None # set after creation @@ -45,7 +47,7 @@ def create_lambda_function(self): Description = self.function_name, MemorySize = self.memory, Timeout = self.timeout, - TracingConfig={'Mode':'Active'} + TracingConfig={'Mode':'PassThrough'} ) self.function_arn = response['FunctionArn'] print response @@ -127,7 +129,7 @@ def cleanup_logs(cls, func_name): response = log_client.delete_log_group(logGroupName='/aws/lambda/' + func_name) return response -def compute_batch_size(keys, lambda_memory, gzip=False): +def compute_batch_size(keys, lambda_memory, concurrent_lambdas): max_mem_for_data = 0.6 * lambda_memory * 1000 * 1000; size = 0.0 for key in keys: @@ -137,8 +139,11 @@ def compute_batch_size(keys, lambda_memory, gzip=False): size += key.size avg_object_size = size/len(keys) print "Dataset size: %s, nKeys: %s, avg: %s" %(size, len(keys), avg_object_size) - b_size = int(round(max_mem_for_data/avg_object_size)) - return b_size + if avg_object_size < max_mem_for_data and len(keys) < concurrent_lambdas: + b_size = 1 + else: + b_size = int(round(max_mem_for_data/avg_object_size)) + return b_size def batch_creator(all_keys, batch_size): ''' @@ -156,3 +161,35 @@ def batch_creator(all_keys, batch_size): if len(batch): batches.append(batch) return batches + +def load_config(): + ssm_client = boto3.client('ssm') + config_dict={} + + # based on https://gist.github.com/sonodar/b3c80c8b9e60f4e6dcda9108c46a6089 + def read_params(NextToken = None): + params = { + 'Path': SSM_PATH, + 'Recursive': False, + 'WithDecryption': False + } + if NextToken is not None: + params['NextToken'] = NextToken + return ssm_client.get_parameters_by_path(**params) + def parameters(): + NextToken = None + while True: + response = read_params(NextToken) + parameters = response['Parameters'] + if len(parameters) == 0: + break + for parameter in parameters: + yield parameter + if 'NextToken' not in response: + break + NextToken = response['NextToken'] + + config_dict['ssmPath']=SSM_PATH + for parameter in parameters(): + config_dict[parameter.get('Name').replace(SSM_PATH,'')]= parameter.get('Value') + return config_dict diff --git a/src/python/reducerCoordinator.py b/src/python/reducerCoordinator.py index 9723cbc..5e0b66e 100644 --- a/src/python/reducerCoordinator.py +++ b/src/python/reducerCoordinator.py @@ -58,9 +58,8 @@ def get_mapper_files(files): ret.append(mf) return ret -def get_reducer_batch_size(keys): - #TODO: Paramertize memory size - batch_size = lambdautils.compute_batch_size(keys, 1536) +def get_reducer_batch_size(keys, lambda_memory, concurrent_lambdas): + batch_size = lambdautils.compute_batch_size(keys, lambda_memory, concurrent_lambdas) return max(batch_size, 2) # At least 2 in a batch - Condition for termination def check_job_done(files): @@ -130,6 +129,8 @@ def lambda_handler(event, context): map_count = config["mapCount"] r_function_name = config["reducerFunction"] r_handler = config["reducerHandler"] + lambda_memory = config["lambdaMemory"] + concurrent_lambdas = config["concurrentLambdas"] ### Get Mapper Finished Count ### @@ -160,7 +161,7 @@ def lambda_handler(event, context): return # Compute this based on metadata of files - r_batch_size = get_reducer_batch_size(reducer_keys); + r_batch_size = get_reducer_batch_size(reducer_keys, lambda_memory, concurrent_lambdas); print "Starting the the reducer step", step_number print "Batch Size", r_batch_size