Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve general performance for a variety of high-load job launch use cases #8403

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions awx/main/models/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,18 +261,20 @@ class Meta:
app_label = 'main'


def fit_task_to_most_remaining_capacity_instance(self, task):
@staticmethod
def fit_task_to_most_remaining_capacity_instance(task, instances):
instance_most_capacity = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
for i in instances:
if i.remaining_capacity >= task.task_impact and \
(instance_most_capacity is None or
i.remaining_capacity > instance_most_capacity.remaining_capacity):
instance_most_capacity = i
return instance_most_capacity

def find_largest_idle_instance(self):
@staticmethod
def find_largest_idle_instance(instances):
largest_instance = None
for i in self.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
for i in instances:
if i.jobs_running == 0:
if largest_instance is None:
largest_instance = i
Expand Down
8 changes: 7 additions & 1 deletion awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,13 @@ def save(self, *args, **kwargs):

# If status changed, update the parent instance.
if self.status != status_before:
self._update_parent_instance()
# Update parent outside of the transaction for Job w/ allow_simultaneous=True
# This dodges lock contention at the expense of the foreign key not being
# completely correct.
if getattr(self, 'allow_simultaneous', False):
connection.on_commit(self._update_parent_instance)
else:
self._update_parent_instance()

# Done.
return result
Expand Down
53 changes: 42 additions & 11 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
import json
import random
from types import SimpleNamespace

# Django
from django.db import transaction, connection
Expand Down Expand Up @@ -45,17 +46,46 @@
class TaskManager():

def __init__(self):
'''
Do NOT put database queries or other potentially expensive operations
in the task manager init. The task manager object is created every time a
job is created, transitions state, and every 30 seconds on each tower node.
More often then not, the object is destroyed quickly because the NOOP case is hit.
The NOOP case is short-circuit logic. If the task manager realizes that another instance
of the task manager is already running, then it short-circuits and decides not to run.
'''
self.graph = dict()
# start task limit indicates how many pending jobs can be started on this
# .schedule() run. Starting jobs is expensive, and there is code in place to reap
# the task manager after 5 minutes. At scale, the task manager can easily take more than
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.start_task_limit = settings.START_TASK_LIMIT

def after_lock_init(self):
'''
Init AFTER we know this instance of the task manager will run because the lock is acquired.
'''
instances = Instance.objects.filter(capacity__gt=0, enabled=True)
self.real_instances = {i.hostname: i for i in instances}

instances_partial = [SimpleNamespace(obj=instance,
remaining_capacity=instance.remaining_capacity,
capacity=instance.capacity,
jobs_running=instance.jobs_running,
hostname=instance.hostname) for instance in instances]

instances_by_hostname = {i.hostname: i for i in instances_partial}

for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(rampart_group.name),
capacity_total=rampart_group.capacity,
consumed_capacity=0)
consumed_capacity=0,
instances=[])
for instance in rampart_group.instances.filter(capacity__gt=0, enabled=True).order_by('hostname'):
if instance.hostname in instances_by_hostname:
self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname])

def is_job_blocked(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
Expand Down Expand Up @@ -466,7 +496,6 @@ def process_pending_tasks(self, pending_tasks):
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
idle_instance_that_fits = None
if isinstance(task, WorkflowJob):
if task.unified_job_template_id in running_workflow_templates:
if not task.allow_simultaneous:
Expand All @@ -483,24 +512,23 @@ def process_pending_tasks(self, pending_tasks):
found_acceptable_queue = True
break

if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
remaining_capacity = self.get_remaining_capacity(rampart_group.name)
if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {}, remaining_capacity {} <= 0".format(
rampart_group.name, remaining_capacity))
continue

execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
if execution_instance:
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
elif not execution_instance and idle_instance_that_fits:
execution_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(task, self.graph[rampart_group.name]['instances']) or \
InstanceGroup.find_largest_idle_instance(self.graph[rampart_group.name]['instances'])

if execution_instance or rampart_group.is_containerized:
if not rampart_group.is_containerized:
execution_instance = idle_instance_that_fits
execution_instance.remaining_capacity = max(0, execution_instance.remaining_capacity - task.task_impact)
execution_instance.jobs_running += 1
logger.debug("Starting {} in group {} instance {} (remaining_capacity={})".format(
task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity))
if execution_instance or rampart_group.is_containerized:

execution_instance = self.real_instances[execution_instance.hostname]
self.graph[rampart_group.name]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
Expand Down Expand Up @@ -572,6 +600,9 @@ def process_tasks(self, all_sorted_tasks):
def _schedule(self):
finished_wfjs = []
all_sorted_tasks = self.get_tasks()

self.after_lock_init()

if len(all_sorted_tasks) > 0:
# TODO: Deal with
# latest_project_updates = self.get_latest_project_update_tasks(all_sorted_tasks)
Expand Down
35 changes: 13 additions & 22 deletions awx/main/tests/unit/models/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,14 @@ class TestInstanceGroup(object):
(T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
])
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=lambda x: instances))):
ig = InstanceGroup(id=10)
ig = InstanceGroup(id=10)

if instance_fit_index is None:
assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason
else:
assert ig.fit_task_to_most_remaining_capacity_instance(task) == \
instances[instance_fit_index], reason
instance_picked = ig.fit_task_to_most_remaining_capacity_instance(task, instances)

if instance_fit_index is None:
assert instance_picked is None, reason
else:
assert instance_picked == instances[instance_fit_index], reason

@pytest.mark.parametrize('instances,instance_fit_index,reason', [
(Is([(0, 100)]), 0, "One idle instance, pick it"),
Expand All @@ -70,16 +65,12 @@ def test_find_largest_idle_instance(self, instances, instance_fit_index, reason)
def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances)

with mock.patch.object(InstanceGroup,
'instances',
Mock(spec_set=['filter'],
filter=lambda *args, **kargs: Mock(spec_set=['order_by'],
order_by=filter_offline_instances))):
ig = InstanceGroup(id=10)
ig = InstanceGroup(id=10)
instances_online_only = filter_offline_instances(instances)

if instance_fit_index is None:
assert ig.find_largest_idle_instance() is None, reason
else:
assert ig.find_largest_idle_instance() == \
instances[instance_fit_index], reason
if instance_fit_index is None:
assert ig.find_largest_idle_instance(instances_online_only) is None, reason
else:
assert ig.find_largest_idle_instance(instances_online_only) == \
instances[instance_fit_index], reason