diff --git a/application-code/lambda-function-queue-trigger/lambda_function.py b/application-code/lambda-function-queue-trigger/lambda_function.py index cd37f6bb..71e5098c 100644 --- a/application-code/lambda-function-queue-trigger/lambda_function.py +++ b/application-code/lambda-function-queue-trigger/lambda_function.py @@ -1,14 +1,17 @@ -import boto3, os +import boto3 +from math import ceil sqs = boto3.client('sqs') ecs = boto3.client('ecs') ssm = boto3.client('ssm') + +batch_size = 10 max_tasks_per_run = 100 + def lambda_handler(event, context): max_tasks = None sqs_url = None - job_mode = None pipeline_enabled = None TASK_CLUSTER = None TASK_CONTAINER = None @@ -62,48 +65,48 @@ def lambda_handler(event, context): return sqs_response = sqs.get_queue_attributes( QueueUrl=sqs_url, - AttributeNames=[ 'ApproximateNumberOfMessages' ] + AttributeNames=['ApproximateNumberOfMessages'] ) sqs_queue_size = int(sqs_response['Attributes']['ApproximateNumberOfMessages']) print("Current SQS Queue size: " + str(sqs_queue_size)) if sqs_queue_size == 0: return ecs_response = ecs.list_tasks( - cluster=TASK_CLUSTER,maxResults=100,desiredStatus='RUNNING',family=TASK_CONTAINER) + cluster=TASK_CLUSTER, maxResults=100, desiredStatus='RUNNING', family=TASK_CONTAINER) current_running_tasks = len(ecs_response["taskArns"]) available_tasks = max_tasks - current_running_tasks - tasks_to_start = min([sqs_queue_size, available_tasks, max_tasks_per_run, max_tasks]) + tasks_to_start = min([ceil(sqs_queue_size / batch_size), available_tasks, max_tasks_per_run, max_tasks]) print("ECS Tasks to start: " + str(tasks_to_start)) - if tasks_to_start<=0: + if tasks_to_start <= 0: return run_task_response = ecs.run_task( capacityProviderStrategy=[ { - 'capacityProvider': 'FARGATE', - 'weight': 1, - 'base': 2 + 'capacityProvider': 'FARGATE', + 'weight': 1, + 'base': 2 }, { - 'capacityProvider': 'FARGATE_SPOT', - 'weight': 4, - 'base': 0 + 'capacityProvider': 'FARGATE_SPOT', + 'weight': 4, + 'base': 0 } ], cluster=TASK_CLUSTER, taskDefinition=TASK_DEFINITION, overrides={ 'containerOverrides': [ - { - 'name': TASK_CONTAINER, - 'environment': [ { - 'name': 'PIPELINE_ECS_JOB_MODE', - 'value': '1' - }, { - 'name': 'PIPELINE_S3_DEST_PREFIX', - 'value': S3_DEST_PREFIX + 'name': TASK_CONTAINER, + 'environment': [ + { + 'name': 'PIPELINE_ECS_JOB_MODE', + 'value': '1' + }, { + 'name': 'PIPELINE_S3_DEST_PREFIX', + 'value': S3_DEST_PREFIX + } + ] } - ] - } ] }, count=tasks_to_start, @@ -114,6 +117,7 @@ def lambda_handler(event, context): 'securityGroups': [TASK_SECURITYGROUP], 'assignPublicIp': 'DISABLED' } - } + }, + propagateTags='TASK_DEFINITION' ) return tasks_to_start