From 09a0448c3e29d8be5c662cc78e58f05cff77026f Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 15 Oct 2020 15:25:47 -0400 Subject: [PATCH 1/2] reduce parent->child lock contention * We update the parent unified job template to point at new jobs created. We also update a similar foreign key when the job finishes running. This causes lock contention when the job template is allow_simultaneous and there are a lot of jobs from that job template running in parallel. I've seen as bad as 5 minutes waiting for the lock when a job finishes. * This change moves the parent->child update to OUTSIDE of the transaction if the job is allow_simultaneous (inherited from the parent unified job). We sacrafice a bit of correctness for performance. The logic is, if you are launching 1,000 parallel jobs do you really care that the job template contains a pointer to the last one you launched? Probably not. If you do, you can always query jobs related to the job template sorted by created time. --- awx/main/models/unified_jobs.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 1abbb29fcb67..c50c8668d5b3 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -873,7 +873,13 @@ def save(self, *args, **kwargs): # If status changed, update the parent instance. if self.status != status_before: - self._update_parent_instance() + # Update parent outside of the transaction for Job w/ allow_simultaneous=True + # This dodges lock contention at the expense of the foreign key not being + # completely correct. + if getattr(self, 'allow_simultaneous', False): + connection.on_commit(self._update_parent_instance) + else: + self._update_parent_instance() # Done. return result From 2eac5a88730deff82b542c0a135dc8f7fa8be400 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Fri, 2 Oct 2020 13:16:21 -0400 Subject: [PATCH 2/2] reduce per-job database query count * Do not query the database for the set of Instance that belong to the group for which we are trying to fit a job on, for each job. * Instead, cache the set of instances per-instance group. --- awx/main/models/ha.py | 10 +++-- awx/main/scheduler/task_manager.py | 53 +++++++++++++++++++++------ awx/main/tests/unit/models/test_ha.py | 35 +++++++----------- 3 files changed, 61 insertions(+), 37 deletions(-) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index fc4e9c022ecc..50717866535b 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -261,18 +261,20 @@ class Meta: app_label = 'main' - def fit_task_to_most_remaining_capacity_instance(self, task): + @staticmethod + def fit_task_to_most_remaining_capacity_instance(task, instances): instance_most_capacity = None - for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): + for i in instances: if i.remaining_capacity >= task.task_impact and \ (instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity): instance_most_capacity = i return instance_most_capacity - def find_largest_idle_instance(self): + @staticmethod + def find_largest_idle_instance(instances): largest_instance = None - for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): + for i in instances: if i.jobs_running == 0: if largest_instance is None: largest_instance = i diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 9f4818bd37c5..861aa0b63fd6 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -7,6 +7,7 @@ import uuid import json import random +from types import SimpleNamespace # Django from django.db import transaction, connection @@ -45,6 +46,15 @@ class TaskManager(): def __init__(self): + ''' + Do NOT put database queries or other potentially expensive operations + in the task manager init. The task manager object is created every time a + job is created, transitions state, and every 30 seconds on each tower node. + More often then not, the object is destroyed quickly because the NOOP case is hit. + + The NOOP case is short-circuit logic. If the task manager realizes that another instance + of the task manager is already running, then it short-circuits and decides not to run. + ''' self.graph = dict() # start task limit indicates how many pending jobs can be started on this # .schedule() run. Starting jobs is expensive, and there is code in place to reap @@ -52,10 +62,30 @@ def __init__(self): # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. self.start_task_limit = settings.START_TASK_LIMIT + + def after_lock_init(self): + ''' + Init AFTER we know this instance of the task manager will run because the lock is acquired. + ''' + instances = Instance.objects.filter(capacity__gt=0, enabled=True) + self.real_instances = {i.hostname: i for i in instances} + + instances_partial = [SimpleNamespace(obj=instance, + remaining_capacity=instance.remaining_capacity, + capacity=instance.capacity, + jobs_running=instance.jobs_running, + hostname=instance.hostname) for instance in instances] + + instances_by_hostname = {i.hostname: i for i in instances_partial} + for rampart_group in InstanceGroup.objects.prefetch_related('instances'): self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name), capacity_total=rampart_group.capacity, - consumed_capacity=0) + consumed_capacity=0, + instances=[]) + for instance in rampart_group.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'): + if instance.hostname in instances_by_hostname: + self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname]) def is_job_blocked(self, task): # TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph @@ -466,7 +496,6 @@ def process_pending_tasks(self, pending_tasks): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False - idle_instance_that_fits = None if isinstance(task, WorkflowJob): if task.unified_job_template_id in running_workflow_templates: if not task.allow_simultaneous: @@ -483,24 +512,23 @@ def process_pending_tasks(self, pending_tasks): found_acceptable_queue = True break - if idle_instance_that_fits is None: - idle_instance_that_fits = rampart_group.find_largest_idle_instance() remaining_capacity = self.get_remaining_capacity(rampart_group.name) if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug("Skipping group {}, remaining_capacity {} <= 0".format( rampart_group.name, remaining_capacity)) continue - execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) - if execution_instance: - logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( - task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) - elif not execution_instance and idle_instance_that_fits: + execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(task, self.graph[rampart_group.name]['instances']) or \ + InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances']) + + if execution_instance or rampart_group.is_containerized: if not rampart_group.is_containerized: - execution_instance = idle_instance_that_fits + execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact) + execution_instance.jobs_running += 1 logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format( task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) - if execution_instance or rampart_group.is_containerized: + + execution_instance = self.real_instances[execution_instance.hostname] self.graph[rampart_group.name]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True @@ -572,6 +600,9 @@ def process_tasks(self, all_sorted_tasks): def _schedule(self): finished_wfjs = [] all_sorted_tasks = self.get_tasks() + + self.after_lock_init() + if len(all_sorted_tasks) > 0: # TODO: Deal with # latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks) diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py index 0e29caf8aa15..2534acfd15f1 100644 --- a/awx/main/tests/unit/models/test_ha.py +++ b/awx/main/tests/unit/models/test_ha.py @@ -45,19 +45,14 @@ class TestInstanceGroup(object): (T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"), ]) def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): - with mock.patch.object(InstanceGroup, - 'instances', - Mock(spec_set=['filter'], - filter=lambda *args, **kargs: Mock(spec_set=['order_by'], - order_by=lambda x: instances))): - ig = InstanceGroup(id=10) + ig = InstanceGroup(id=10) - if instance_fit_index is None: - assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason - else: - assert ig.fit_task_to_most_remaining_capacity_instance(task) == \ - instances[instance_fit_index], reason + instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances) + if instance_fit_index is None: + assert instance_picked is None, reason + else: + assert instance_picked == instances[instance_fit_index], reason @pytest.mark.parametrize('instances,instance_fit_index,reason', [ (Is([(0, 100)]), 0, "One idle instance, pick it"), @@ -70,16 +65,12 @@ def test_find_largest_idle_instance(self, instances, instance_fit_index, reason) def filter_offline_instances(*args): return filter(lambda i: i.capacity > 0, instances) - with mock.patch.object(InstanceGroup, - 'instances', - Mock(spec_set=['filter'], - filter=lambda *args, **kargs: Mock(spec_set=['order_by'], - order_by=filter_offline_instances))): - ig = InstanceGroup(id=10) + ig = InstanceGroup(id=10) + instances_online_only = filter_offline_instances(instances) - if instance_fit_index is None: - assert ig.find_largest_idle_instance() is None, reason - else: - assert ig.find_largest_idle_instance() == \ - instances[instance_fit_index], reason + if instance_fit_index is None: + assert ig.find_largest_idle_instance(instances_online_only) is None, reason + else: + assert ig.find_largest_idle_instance(instances_online_only) == \ + instances[instance_fit_index], reason