Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: start tasks based on processing batch size in the queue-processing example #220

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions application-code/lambda-function-queue-trigger/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import boto3, os
import boto3
from math import ceil

sqs = boto3.client('sqs')
ecs = boto3.client('ecs')
ssm = boto3.client('ssm')

batch_size = 10
max_tasks_per_run = 100


def lambda_handler(event, context):
max_tasks = None
sqs_url = None
job_mode = None
pipeline_enabled = None
TASK_CLUSTER = None
TASK_CONTAINER = None
Expand Down Expand Up @@ -62,48 +65,48 @@ def lambda_handler(event, context):
return
sqs_response = sqs.get_queue_attributes(
QueueUrl=sqs_url,
AttributeNames=[ 'ApproximateNumberOfMessages' ]
AttributeNames=['ApproximateNumberOfMessages']
)
sqs_queue_size = int(sqs_response['Attributes']['ApproximateNumberOfMessages'])
print("Current SQS Queue size: " + str(sqs_queue_size))
if sqs_queue_size == 0:
return
ecs_response = ecs.list_tasks(
cluster=TASK_CLUSTER,maxResults=100,desiredStatus='RUNNING',family=TASK_CONTAINER)
cluster=TASK_CLUSTER, maxResults=100, desiredStatus='RUNNING', family=TASK_CONTAINER)
current_running_tasks = len(ecs_response["taskArns"])
available_tasks = max_tasks - current_running_tasks
tasks_to_start = min([sqs_queue_size, available_tasks, max_tasks_per_run, max_tasks])
tasks_to_start = min([ceil(sqs_queue_size / batch_size), available_tasks, max_tasks_per_run, max_tasks])
print("ECS Tasks to start: " + str(tasks_to_start))
if tasks_to_start<=0:
if tasks_to_start <= 0:
return
run_task_response = ecs.run_task(
capacityProviderStrategy=[
{
'capacityProvider': 'FARGATE',
'weight': 1,
'base': 2
'capacityProvider': 'FARGATE',
'weight': 1,
'base': 2
}, {
'capacityProvider': 'FARGATE_SPOT',
'weight': 4,
'base': 0
'capacityProvider': 'FARGATE_SPOT',
'weight': 4,
'base': 0
}
],
cluster=TASK_CLUSTER,
taskDefinition=TASK_DEFINITION,
overrides={
'containerOverrides': [
{
'name': TASK_CONTAINER,
'environment': [
{
'name': 'PIPELINE_ECS_JOB_MODE',
'value': '1'
}, {
'name': 'PIPELINE_S3_DEST_PREFIX',
'value': S3_DEST_PREFIX
'name': TASK_CONTAINER,
'environment': [
{
'name': 'PIPELINE_ECS_JOB_MODE',
'value': '1'
}, {
'name': 'PIPELINE_S3_DEST_PREFIX',
'value': S3_DEST_PREFIX
}
]
}
]
}
]
},
count=tasks_to_start,
Expand All @@ -114,6 +117,7 @@ def lambda_handler(event, context):
'securityGroups': [TASK_SECURITYGROUP],
'assignPublicIp': 'DISABLED'
}
}
},
propagateTags='TASK_DEFINITION'
)
return tasks_to_start
Loading