Skip to content

Commit

Permalink
Model changes for instance last_seen field to replace modified (#10870)
Browse files Browse the repository at this point in the history
* Model changes for instance last_seen field to replace modified

* Break up refresh_capacity into smaller units

* Rename execution node methods, fix last_seen clustering

* Use update_fields to make it clear save only affects capacity

* Restructing to pass unit tests

* Fix bug where a PATCH did not update capacity value
  • Loading branch information
AlanCoding authored and pull[bot] committed Jun 1, 2022
1 parent e7bd511 commit d2845a9
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 128 deletions.
9 changes: 9 additions & 0 deletions awx/api/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,15 @@ class InstanceDetail(RetrieveUpdateAPIView):
model = models.Instance
serializer_class = serializers.InstanceSerializer

def update(self, request, *args, **kwargs):
r = super(InstanceDetail, self).update(request, *args, **kwargs)
if status.is_success(r.status_code):
obj = self.get_object()
obj.set_capacity_value()
obj.save(update_fields=['capacity'])
r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
return r


class InstanceUnifiedJobsList(SubListAPIView):

Expand Down
2 changes: 1 addition & 1 deletion awx/main/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def me(self):
"""Return the currently active instance."""
# If we are running unit tests, return a stub record.
if settings.IS_TESTING(sys.argv) or hasattr(sys, '_called_from_test'):
return self.model(id=1, hostname='localhost', uuid='00000000-0000-0000-0000-000000000000')
return self.model(id=1, hostname=settings.CLUSTER_HOST_ID, uuid='00000000-0000-0000-0000-000000000000')

node = self.filter(hostname=settings.CLUSTER_HOST_ID)
if node.exists():
Expand Down
27 changes: 27 additions & 0 deletions awx/main/migrations/0153_instance_last_seen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 2.2.20 on 2021-08-12 13:55

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0152_instance_node_type'),
]

operations = [
migrations.AddField(
model_name='instance',
name='last_seen',
field=models.DateTimeField(
editable=False,
help_text='Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.',
null=True,
),
),
migrations.AlterField(
model_name='instance',
name='memory',
field=models.BigIntegerField(default=0, editable=False, help_text='Total system memory of this instance in bytes.'),
),
]
113 changes: 84 additions & 29 deletions awx/main/models/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from awx.main.fields import JSONField
from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity
from awx.main.utils.common import measure_cpu, get_corrected_cpu, get_cpu_effective_capacity, measure_memory, get_corrected_memory, get_mem_effective_capacity
from awx.main.models.mixins import RelatedJobsMixin

__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState')
Expand Down Expand Up @@ -52,6 +52,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):

objects = InstanceManager()

# Fields set in instance registration
uuid = models.CharField(max_length=40)
hostname = models.CharField(max_length=250, unique=True)
ip_address = models.CharField(
Expand All @@ -61,24 +62,34 @@ class Instance(HasPolicyEditsMixin, BaseModel):
max_length=50,
unique=True,
)
# Auto-fields, implementation is different from BaseModel
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
# Fields defined in health check or heartbeat
version = models.CharField(max_length=120, blank=True)
capacity = models.PositiveIntegerField(
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)])
enabled = models.BooleanField(default=True)
managed_by_policy = models.BooleanField(default=True)
cpu = models.IntegerField(
default=0,
editable=False,
)
memory = models.BigIntegerField(
default=0,
editable=False,
help_text=_('Total system memory of this instance in bytes.'),
)
last_seen = models.DateTimeField(
null=True,
editable=False,
help_text=_('Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.'),
)
# Capacity management
capacity = models.PositiveIntegerField(
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)])
enabled = models.BooleanField(default=True)
managed_by_policy = models.BooleanField(default=True)

cpu_capacity = models.IntegerField(
default=0,
editable=False,
Expand Down Expand Up @@ -126,39 +137,83 @@ def choose_online_control_plane_node():
return random.choice(Instance.objects.filter(enabled=True).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True))

def is_lost(self, ref_time=None):
if self.last_seen is None:
return True
if ref_time is None:
ref_time = now()
grace_period = 120
return self.modified < ref_time - timedelta(seconds=grace_period)

def mark_offline(self, on_good_terms=False):
self.cpu = self.cpu_capacity = self.memory = self.mem_capacity = self.capacity = 0
update_fields = ['capacity', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity']
if on_good_terms:
update_fields.append('modified')
self.save()

def refresh_capacity(self):
cpu = get_cpu_capacity()
mem = get_mem_capacity()
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2
if self.node_type == 'execution':
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
return self.last_seen < ref_time - timedelta(seconds=grace_period)

def mark_offline(self, update_last_seen=False, perform_save=True):
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and (not update_last_seen):
return
self.cpu_capacity = self.mem_capacity = self.capacity = 0
if update_last_seen:
self.last_seen = now()

if perform_save:
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity']
if update_last_seen:
update_fields += ['last_seen']
self.save(update_fields=update_fields)

def set_capacity_value(self):
"""Sets capacity according to capacity adjustment rule (no save)"""
if self.enabled:
self.capacity = get_system_task_capacity(self.capacity_adjustment)
lower_cap = min(self.mem_capacity, self.cpu_capacity)
higher_cap = max(self.mem_capacity, self.cpu_capacity)
self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment
else:
self.capacity = 0

def refresh_capacity_fields(self):
"""Update derived capacity fields from cpu and memory (no save)"""
self.cpu_capacity = get_cpu_effective_capacity(self.cpu)
self.mem_capacity = get_mem_effective_capacity(self.memory)
self.set_capacity_value()

def save_health_data(self, version, cpu, memory, last_seen=None, has_error=False):
update_fields = []

if last_seen is not None and self.last_seen != last_seen:
self.last_seen = last_seen
update_fields.append('last_seen')

if self.version != version:
self.version = version
update_fields.append('version')

new_cpu = get_corrected_cpu(cpu)
if new_cpu != self.cpu:
self.cpu = new_cpu
update_fields.append('cpu')

new_memory = get_corrected_memory(memory)
if new_memory != self.memory:
self.memory = new_memory
update_fields.append('memory')

if not has_error:
self.refresh_capacity_fields()
else:
self.mark_offline(perform_save=False)
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])

self.save(update_fields=update_fields)

def local_health_check(self):
"""Only call this method on the instance that this record represents"""
has_error = False
try:
# if redis is down for some reason, that means we can't persist
# playbook event data; we should consider this a zero capacity event
redis.Redis.from_url(settings.BROKER_URL).ping()
except redis.ConnectionError:
self.capacity = 0
has_error = True

self.cpu = cpu[0]
self.memory = mem[0]
self.cpu_capacity = cpu[1]
self.mem_capacity = mem[1]
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
self.save_health_data(awx_application_version, measure_cpu(), measure_memory(), last_seen=now(), has_error=has_error)


class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
Expand Down
93 changes: 46 additions & 47 deletions awx/main/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@
parse_yaml_or_json,
cleanup_new_process,
create_partition,
get_cpu_effective_capacity,
get_mem_effective_capacity,
get_system_task_capacity,
)
from awx.main.utils.execution_environments import get_default_pod_spec, CONTAINER_ROOT, to_container_path
from awx.main.utils.ansible import read_ansible_config
Expand Down Expand Up @@ -180,7 +177,7 @@ def dispatch_startup():
def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(on_good_terms=True) # No thank you to new jobs while shut down
this_inst.mark_offline(update_last_seen=True) # No thank you to new jobs while shut down
try:
reaper.reap(this_inst)
except Exception:
Expand Down Expand Up @@ -403,72 +400,74 @@ def cleanup_execution_environment_images():


@task(queue=get_local_queuename)
def check_heartbeat(node):
def execution_node_health_check(node):
try:
instance = Instance.objects.get(hostname=node)
except Instance.DoesNotExist:
logger.warn(f'Instance record for {node} missing, could not check capacity.')
return
data = worker_info(node)

prior_capacity = instance.capacity

instance.save_health_data(
'ansible-runner-' + data.get('Version', '???'),
data.get('CPU Capacity', 0), # TODO: rename field on runner side to not say "Capacity"
data.get('Memory Capacity', 0) * 1000, # TODO: double-check the multiplier here
has_error=bool(data.get('Errors')),
)

if data['Errors']:
formatted_error = "\n".join(data["Errors"])
if instance.capacity:
if prior_capacity:
logger.warn(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}')
else:
logger.info(f'Failed to find capacity of new or lost execution node {node}, errors:\n{formatted_error}')
instance.mark_offline()
else:
# TODO: spin off new instance method from refresh_capacity that calculates derived fields
instance.cpu = data['CPU Capacity'] # TODO: rename field on runner side to not say "Capacity"
instance.cpu_capacity = get_cpu_effective_capacity(instance.cpu)
instance.memory = data['Memory Capacity'] * 1000 # TODO: double-check the multiplier here
instance.mem_capacity = get_mem_effective_capacity(instance.memory)
instance.capacity = get_system_task_capacity(
instance.capacity_adjustment,
instance.cpu_capacity,
instance.mem_capacity,
)
instance.version = 'ansible-runner-' + data['Version']
instance.save(update_fields=['capacity', 'version', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2)))


def discover_receptor_nodes():
def inspect_execution_nodes(instance_list):
node_lookup = {}
for inst in instance_list:
if inst.node_type == 'execution':
node_lookup[inst.hostname] = inst

ctl = get_receptor_ctl()
connections = ctl.simple_command('status')['Advertisements']
nowtime = now()
for ad in connections:
hostname = ad['NodeID']
commands = ad['WorkCommands'] or []
commands = ad.get('WorkCommands') or []
if 'ansible-runner' not in commands:
continue
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution')
changed = False
if hostname in node_lookup:
instance = node_lookup[hostname]
else:
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution')
was_lost = instance.is_lost(ref_time=nowtime)
last_seen = parse_date(ad['Time'])
if instance.modified == last_seen:
continue
instance.modified = last_seen
if instance.is_lost(ref_time=nowtime):
# if the instance hasn't advertised in awhile, don't save a new modified time
# this is so multiple cluster nodes do all make repetitive updates

if instance.last_seen and instance.last_seen >= last_seen:
continue
instance.last_seen = last_seen
instance.save(update_fields=['last_seen'])

instance.save(update_fields=['modified'])
if changed:
logger.warn("Registered execution node '{}'".format(hostname))
check_heartbeat.apply_async([hostname])
execution_node_health_check.apply_async([hostname])
elif was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
check_heartbeat.apply_async([hostname])
execution_node_health_check.apply_async([hostname])
elif instance.capacity == 0:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
check_heartbeat.apply_async([hostname])
execution_node_health_check.apply_async([hostname])


@task(queue=get_local_queuename)
Expand All @@ -479,34 +478,34 @@ def cluster_node_heartbeat():
this_inst = None
lost_instances = []

(changed, instance) = Instance.objects.get_or_register()
if changed:
logger.info("Registered tower control node '{}'".format(instance.hostname))

discover_receptor_nodes()

for inst in list(instance_list):
for inst in instance_list:
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
instance_list.remove(inst)
elif inst.node_type == 'execution': # TODO: zero out capacity of execution nodes that are MIA
# Only considering control plane for this logic
continue
elif inst.is_lost(ref_time=nowtime):
break
else:
(changed, this_inst) = Instance.objects.get_or_register()
if changed:
logger.info("Registered tower control node '{}'".format(this_inst.hostname))

inspect_execution_nodes(instance_list)

for inst in list(instance_list):
if inst.is_lost(ref_time=nowtime):
lost_instances.append(inst)
instance_list.remove(inst)

if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
this_inst.refresh_capacity()
if startup_event:
this_inst.local_health_check()
if startup_event and this_inst.capacity != 0:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
return
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in instance_list:
if other_inst.version == "" or other_inst.version.startswith('ansible-runner'):
if other_inst.version == "" or other_inst.version.startswith('ansible-runner') or other_inst.node_type == 'execution':
continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
logger.error(
Expand Down Expand Up @@ -534,7 +533,7 @@ def cluster_node_heartbeat():
# since we will delete the node anyway.
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
other_inst.mark_offline()
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.modified))
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
deprovision_hostname = other_inst.hostname
other_inst.delete()
Expand Down
Loading

0 comments on commit d2845a9

Please sign in to comment.