From dfd4801899410f89b8bd33a2ef602a13b1d514c6 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Wed, 8 Jan 2020 16:14:47 -0500 Subject: [PATCH] optimize the callback receiver to buffer writes on high throughput additionaly, optimize away several per-event host lookups and changed/failed propagation lookups we've always performed these (fairly expensive) queries *on every event save* - if you're processing tens of thousands of events in short bursts, this is way too slow this commit also introduces a new command for profiling the insertion rate of events, `awx-manage callback_stats` see: https://github.com/ansible/awx/issues/5514 --- awx/api/serializers.py | 100 -------- awx/conf/settings.py | 15 ++ awx/main/dispatch/pool.py | 2 +- awx/main/dispatch/worker/base.py | 5 +- awx/main/dispatch/worker/callback.py | 187 +++++++++------ .../management/commands/callback_stats.py | 38 +++ .../management/commands/replay_job_events.py | 28 +-- awx/main/models/events.py | 221 +++++++----------- awx/main/signals.py | 52 +---- awx/main/tasks.py | 18 +- awx/main/tests/functional/api/test_events.py | 4 +- .../tests/functional/api/test_settings.py | 2 + .../commands/test_secret_key_regeneration.py | 14 +- .../tests/functional/models/test_events.py | 102 +++----- .../unit/commands/test_replay_job_events.py | 4 +- awx/main/tests/unit/models/test_events.py | 42 ++-- awx/settings/defaults.py | 3 - 17 files changed, 340 insertions(+), 497 deletions(-) create mode 100644 awx/main/management/commands/callback_stats.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index e75b343711e3..2598fd0e1e28 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3874,26 +3874,6 @@ def to_representation(self, obj): return data -class JobEventWebSocketSerializer(JobEventSerializer): - created = serializers.SerializerMethodField() - modified = serializers.SerializerMethodField() - event_name = serializers.CharField(source='event') - group_name = serializers.SerializerMethodField() - - class Meta: - model = JobEvent - fields = ('*', 'event_name', 'group_name',) - - def get_created(self, obj): - return obj.created.isoformat() - - def get_modified(self, obj): - return obj.modified.isoformat() - - def get_group_name(self, obj): - return 'job_events' - - class ProjectUpdateEventSerializer(JobEventSerializer): stdout = serializers.SerializerMethodField() event_data = serializers.SerializerMethodField() @@ -3925,26 +3905,6 @@ def get_event_data(self, obj): return {} -class ProjectUpdateEventWebSocketSerializer(ProjectUpdateEventSerializer): - created = serializers.SerializerMethodField() - modified = serializers.SerializerMethodField() - event_name = serializers.CharField(source='event') - group_name = serializers.SerializerMethodField() - - class Meta: - model = ProjectUpdateEvent - fields = ('*', 'event_name', 'group_name',) - - def get_created(self, obj): - return obj.created.isoformat() - - def get_modified(self, obj): - return obj.modified.isoformat() - - def get_group_name(self, obj): - return 'project_update_events' - - class AdHocCommandEventSerializer(BaseSerializer): event_display = serializers.CharField(source='get_event_display', read_only=True) @@ -3976,26 +3936,6 @@ def to_representation(self, obj): return data -class AdHocCommandEventWebSocketSerializer(AdHocCommandEventSerializer): - created = serializers.SerializerMethodField() - modified = serializers.SerializerMethodField() - event_name = serializers.CharField(source='event') - group_name = serializers.SerializerMethodField() - - class Meta: - model = AdHocCommandEvent - fields = ('*', 'event_name', 'group_name',) - - def get_created(self, obj): - return obj.created.isoformat() - - def get_modified(self, obj): - return obj.modified.isoformat() - - def get_group_name(self, obj): - return 'ad_hoc_command_events' - - class InventoryUpdateEventSerializer(AdHocCommandEventSerializer): class Meta: @@ -4011,26 +3951,6 @@ def get_related(self, obj): return res -class InventoryUpdateEventWebSocketSerializer(InventoryUpdateEventSerializer): - created = serializers.SerializerMethodField() - modified = serializers.SerializerMethodField() - event_name = serializers.CharField(source='event') - group_name = serializers.SerializerMethodField() - - class Meta: - model = InventoryUpdateEvent - fields = ('*', 'event_name', 'group_name',) - - def get_created(self, obj): - return obj.created.isoformat() - - def get_modified(self, obj): - return obj.modified.isoformat() - - def get_group_name(self, obj): - return 'inventory_update_events' - - class SystemJobEventSerializer(AdHocCommandEventSerializer): class Meta: @@ -4046,26 +3966,6 @@ def get_related(self, obj): return res -class SystemJobEventWebSocketSerializer(SystemJobEventSerializer): - created = serializers.SerializerMethodField() - modified = serializers.SerializerMethodField() - event_name = serializers.CharField(source='event') - group_name = serializers.SerializerMethodField() - - class Meta: - model = SystemJobEvent - fields = ('*', 'event_name', 'group_name',) - - def get_created(self, obj): - return obj.created.isoformat() - - def get_modified(self, obj): - return obj.modified.isoformat() - - def get_group_name(self, obj): - return 'system_job_events' - - class JobLaunchSerializer(BaseSerializer): # Representational fields diff --git a/awx/conf/settings.py b/awx/conf/settings.py index 350a4df0490b..100f510fe325 100644 --- a/awx/conf/settings.py +++ b/awx/conf/settings.py @@ -28,6 +28,8 @@ from awx.conf.models import Setting from awx.conf.migrations._reencrypt import decrypt_field as old_decrypt_field +import cachetools + # FIXME: Gracefully handle when settings are accessed before the database is # ready (or during migrations). @@ -136,6 +138,14 @@ def filter_sensitive(registry, key, value): return value +# settings.__getattr__ is called *constantly*, and the LOG_AGGREGATOR_ ones are +# so ubiquitous when external logging is enabled that they should kept in memory +# with a short TTL to avoid even having to contact memcached +# the primary use case for this optimization is the callback receiver +# when external logging is enabled +LOGGING_SETTINGS_CACHE = cachetools.TTLCache(maxsize=50, ttl=1) + + class EncryptedCacheProxy(object): def __init__(self, cache, registry, encrypter=None, decrypter=None): @@ -437,11 +447,16 @@ def SETTINGS_MODULE(self): return self._get_default('SETTINGS_MODULE') def __getattr__(self, name): + if name.startswith('LOG_AGGREGATOR_'): + if name in LOGGING_SETTINGS_CACHE: + return LOGGING_SETTINGS_CACHE[name] value = empty if name in self.all_supported_settings: with _ctit_db_wrapper(trans_safe=True): value = self._get_local(name) if value is not empty: + if name.startswith('LOG_AGGREGATOR_'): + LOGGING_SETTINGS_CACHE[name] = value return value value = self._get_default(name) # sometimes users specify RabbitMQ passwords that contain diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 2351f90c3fdd..fce8ff7281bc 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -277,7 +277,7 @@ def write(self, preferred_queue, body): logger.warn("could not write to queue %s" % preferred_queue) logger.warn("detail: {}".format(tb)) write_attempt_order.append(preferred_queue) - logger.warn("could not write payload to any queue, attempted order: {}".format(write_attempt_order)) + logger.error("could not write payload to any queue, attempted order: {}".format(write_attempt_order)) return None def stop(self, signum): diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index e73ed4baded5..9a0b4735df03 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -119,6 +119,9 @@ def stop(self, signum, frame): class BaseWorker(object): + def read(self, queue): + return queue.get(block=True, timeout=1) + def work_loop(self, queue, finished, idx, *args): ppid = os.getppid() signal_handler = WorkerSignalHandler() @@ -128,7 +131,7 @@ def work_loop(self, queue, finished, idx, *args): if os.getppid() != ppid: break try: - body = queue.get(block=True, timeout=1) + body = self.read(queue) if body == 'QUIT': break except QueueEmpty: diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 578901c5f327..fec131c197b8 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -1,7 +1,9 @@ import logging import time import traceback +from queue import Empty as QueueEmpty +from django.utils.timezone import now as tz_now from django.conf import settings from django.db import DatabaseError, OperationalError, connection as django_connection from django.db.utils import InterfaceError, InternalError @@ -9,11 +11,16 @@ from awx.main.consumers import emit_channel_notification from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob) +from awx.main.models.events import emit_event_detail from .base import BaseWorker logger = logging.getLogger('awx.main.commands.run_callback_receiver') +# the number of seconds to buffer events in memory before flushing +# using JobEvent.objects.bulk_create() +BUFFER_SECONDS = .1 + class CallbackBrokerWorker(BaseWorker): ''' @@ -26,89 +33,127 @@ class CallbackBrokerWorker(BaseWorker): MAX_RETRIES = 2 + def __init__(self): + self.buff = {} + + def read(self, queue): + try: + return queue.get(block=True, timeout=BUFFER_SECONDS) + except QueueEmpty: + return {'event': 'FLUSH'} + + def flush(self, force=False): + now = tz_now() + if ( + force or + any([len(events) == 1000 for events in self.buff.values()]) + ): + for cls, events in self.buff.items(): + logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})') + for e in events: + if not e.created: + e.created = now + e.modified = now + try: + cls.objects.bulk_create(events) + except Exception: + # if an exception occurs, we should re-attempt to save the + # events one-by-one, because something in the list is + # broken/stale (e.g., an IntegrityError on a specific event) + for e in events: + try: + if getattr(e, 'host_id', ''): + # this is one potential IntegrityError we can + # work around - if the host disappears before + # the event can be processed + del e.host_id + e.save() + except Exception: + logger.exception('Database Error Saving Job Event') + for e in events: + emit_event_detail(e) + self.buff = {} + def perform_work(self, body): try: - event_map = { - 'job_id': JobEvent, - 'ad_hoc_command_id': AdHocCommandEvent, - 'project_update_id': ProjectUpdateEvent, - 'inventory_update_id': InventoryUpdateEvent, - 'system_job_id': SystemJobEvent, - } - - if not any([key in body for key in event_map]): - raise Exception('Payload does not have a job identifier') - - def _save_event_data(): + flush = body.get('event') == 'FLUSH' + if not flush: + event_map = { + 'job_id': JobEvent, + 'ad_hoc_command_id': AdHocCommandEvent, + 'project_update_id': ProjectUpdateEvent, + 'inventory_update_id': InventoryUpdateEvent, + 'system_job_id': SystemJobEvent, + } + + job_identifier = 'unknown job' + job_key = 'unknown' for key, cls in event_map.items(): if key in body: - cls.create_from_data(**body) - - job_identifier = 'unknown job' - job_key = 'unknown' - for key in event_map.keys(): - if key in body: - job_identifier = body[key] - job_key = key - break + job_identifier = body[key] + job_key = key + break + + if settings.DEBUG: + from pygments import highlight + from pygments.lexers import PythonLexer + from pygments.formatters import Terminal256Formatter + from pprint import pformat + if body.get('event') == 'EOF': + event_thing = 'EOF event' + else: + event_thing = 'event {}'.format(body.get('counter', 'unknown')) + logger.debug('Callback worker received {} for {} {}'.format( + event_thing, job_key[:-len('_id')], job_identifier + )) + logger.debug('Body: {}'.format( + highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly')) + )[:1024 * 4]) - if settings.DEBUG: - from pygments import highlight - from pygments.lexers import PythonLexer - from pygments.formatters import Terminal256Formatter - from pprint import pformat if body.get('event') == 'EOF': - event_thing = 'EOF event' - else: - event_thing = 'event {}'.format(body.get('counter', 'unknown')) - logger.info('Callback worker received {} for {} {}'.format( - event_thing, job_key[:-len('_id')], job_identifier - )) - logger.debug('Body: {}'.format( - highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly')) - )[:1024 * 4]) - - if body.get('event') == 'EOF': - try: - final_counter = body.get('final_counter', 0) - logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier)) - # EOF events are sent when stdout for the running task is - # closed. don't actually persist them to the database; we - # just use them to report `summary` websocket events as an - # approximation for when a job is "done" - emit_channel_notification( - 'jobs-summary', - dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter) - ) - # Additionally, when we've processed all events, we should - # have all the data we need to send out success/failure - # notification templates - uj = UnifiedJob.objects.get(pk=job_identifier) - if hasattr(uj, 'send_notification_templates'): - retries = 0 - while retries < 5: - if uj.finished: - uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') - break - else: - # wait a few seconds to avoid a race where the - # events are persisted _before_ the UJ.status - # changes from running -> successful - retries += 1 - time.sleep(1) - uj = UnifiedJob.objects.get(pk=job_identifier) - except Exception: - logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier)) - return + try: + final_counter = body.get('final_counter', 0) + logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier)) + # EOF events are sent when stdout for the running task is + # closed. don't actually persist them to the database; we + # just use them to report `summary` websocket events as an + # approximation for when a job is "done" + emit_channel_notification( + 'jobs-summary', + dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter) + ) + # Additionally, when we've processed all events, we should + # have all the data we need to send out success/failure + # notification templates + uj = UnifiedJob.objects.get(pk=job_identifier) + if hasattr(uj, 'send_notification_templates'): + retries = 0 + while retries < 5: + if uj.finished: + uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') + break + else: + # wait a few seconds to avoid a race where the + # events are persisted _before_ the UJ.status + # changes from running -> successful + retries += 1 + time.sleep(1) + uj = UnifiedJob.objects.get(pk=job_identifier) + except Exception: + logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier)) + return + + event = cls.create_from_data(**body) + self.buff.setdefault(cls, []).append(event) retries = 0 while retries <= self.MAX_RETRIES: try: - _save_event_data() + self.flush(force=flush) break except (OperationalError, InterfaceError, InternalError): if retries >= self.MAX_RETRIES: - logger.exception('Worker could not re-establish database connectivity, giving up on event for Job {}'.format(job_identifier)) + logger.exception('Worker could not re-establish database connectivity, giving up on one or more events.') return delay = 60 * retries logger.exception('Database Error Saving Job Event, retry #{i} in {delay} seconds:'.format( @@ -119,7 +164,7 @@ def _save_event_data(): time.sleep(delay) retries += 1 except DatabaseError: - logger.exception('Database Error Saving Job Event for Job {}'.format(job_identifier)) + logger.exception('Database Error Saving Job Event') break except Exception as exc: tb = traceback.format_exc() diff --git a/awx/main/management/commands/callback_stats.py b/awx/main/management/commands/callback_stats.py new file mode 100644 index 000000000000..64d6c41f946d --- /dev/null +++ b/awx/main/management/commands/callback_stats.py @@ -0,0 +1,38 @@ +import time +import sys + +from django.db import connection +from django.core.management.base import BaseCommand + + +class Command(BaseCommand): + + def handle(self, *args, **options): + super(Command, self).__init__() + with connection.cursor() as cursor: + clear = False + while True: + lines = [] + for relation in ( + 'main_jobevent', 'main_inventoryupdateevent', + 'main_projectupdateevent', 'main_adhoccommandevent' + ): + lines.append(relation) + for label, interval in ( + ('last minute: ', '1 minute'), + ('last 5 minutes:', '5 minutes'), + ('last hour: ', '1 hour'), + ): + cursor.execute( + f"SELECT MAX(id) - MIN(id) FROM {relation} WHERE modified > now() - '{interval}'::interval;" + ) + events = cursor.fetchone()[0] or 0 + lines.append(f'↳ {label} {events}') + lines.append('') + if clear: + for i in range(20): + sys.stdout.write('\x1b[1A\x1b[2K') + for l in lines: + print(l) + clear = True + time.sleep(.25) diff --git a/awx/main/management/commands/replay_job_events.py b/awx/main/management/commands/replay_job_events.py index 875578ec298d..ad8d7a5f5fd9 100644 --- a/awx/main/management/commands/replay_job_events.py +++ b/awx/main/management/commands/replay_job_events.py @@ -9,6 +9,7 @@ from django.utils import timezone from django.core.management.base import BaseCommand +from awx.main.models.events import emit_event_detail from awx.main.models import ( UnifiedJob, Job, @@ -17,14 +18,6 @@ InventoryUpdate, SystemJob ) -from awx.main.consumers import emit_channel_notification -from awx.api.serializers import ( - JobEventWebSocketSerializer, - AdHocCommandEventWebSocketSerializer, - ProjectUpdateEventWebSocketSerializer, - InventoryUpdateEventWebSocketSerializer, - SystemJobEventWebSocketSerializer -) class JobStatusLifeCycle(): @@ -96,21 +89,6 @@ def get_job_events(self, job): raise RuntimeError("No events for job id {}".format(job.id)) return job_events, count - def get_serializer(self, job): - if type(job) is Job: - return JobEventWebSocketSerializer - elif type(job) is AdHocCommand: - return AdHocCommandEventWebSocketSerializer - elif type(job) is ProjectUpdate: - return ProjectUpdateEventWebSocketSerializer - elif type(job) is InventoryUpdate: - return InventoryUpdateEventWebSocketSerializer - elif type(job) is SystemJob: - return SystemJobEventWebSocketSerializer - else: - raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job))) - sys.exit(1) - def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=0, final_status_delay=0, debug=False): stats = { 'events_ontime': { @@ -136,7 +114,6 @@ def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=0, fina try: job = self.get_job(job_id) job_events, job_event_count = self.get_job_events(job) - serializer = self.get_serializer(job) except RuntimeError as e: print("{}".format(e.message)) sys.exit(1) @@ -162,8 +139,7 @@ def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=0, fina stats['replay_start'] = self.replay_start je_previous = je_current - je_serialized = serializer(je_current).data - emit_channel_notification('{}-{}'.format(je_serialized['group_name'], job.id), je_serialized) + emit_event_detail(je_current) replay_offset = self.replay_offset(je_previous.created, speed) recording_diff = (je_current.created - je_previous.created).total_seconds() * (1.0 / speed) diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 94886d48f95a..01f494c6fbd2 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -2,7 +2,6 @@ import logging from collections import defaultdict -from django.conf import settings from django.db import models, DatabaseError from django.utils.dateparse import parse_datetime from django.utils.text import Truncator @@ -11,9 +10,10 @@ from django.utils.encoding import force_text from awx.api.versioning import reverse +from awx.main import consumers from awx.main.fields import JSONField from awx.main.models.base import CreatedModifiedModel -from awx.main.utils import ignore_inventory_computed_fields +from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore analytics_logger = logging.getLogger('awx.analytics.job_events') @@ -55,6 +55,51 @@ def create_host_status_counts(event_data): return dict(host_status_counts) +def emit_event_detail(event): + cls = event.__class__ + relation = { + JobEvent: 'job_id', + AdHocCommandEvent: 'ad_hoc_command_id', + ProjectUpdateEvent: 'project_update_id', + InventoryUpdateEvent: 'inventory_update_id', + SystemJobEvent: 'system_job_id', + }[cls] + url = '' + if isinstance(event, JobEvent): + url = '/api/v2/job_events/{}'.format(event.id) + if isinstance(event, AdHocCommandEvent): + url = '/api/v2/ad_hoc_command_events/{}'.format(event.id) + group = camelcase_to_underscore(cls.__name__) + 's' + timestamp = event.created.isoformat() + consumers.emit_channel_notification( + '-'.join([group, str(getattr(event, relation))]), + { + 'id': event.id, + relation.replace('_id', ''): getattr(event, relation), + 'created': timestamp, + 'modified': timestamp, + 'group_name': group, + 'url': url, + 'stdout': event.stdout, + 'counter': event.counter, + 'uuid': event.uuid, + 'parent_uuid': getattr(event, 'parent_uuid', ''), + 'start_line': event.start_line, + 'end_line': event.end_line, + 'event': event.event, + 'event_data': getattr(event, 'event_data', {}), + 'failed': event.failed, + 'changed': event.changed, + 'event_level': getattr(event, 'event_level', ''), + 'play': getattr(event, 'play', ''), + 'role': getattr(event, 'role', ''), + 'task': getattr(event, 'task', ''), + } + ) + + + + class BasePlaybookEvent(CreatedModifiedModel): ''' An event/message logged from a playbook callback for each host. @@ -63,7 +108,7 @@ class BasePlaybookEvent(CreatedModifiedModel): VALID_KEYS = [ 'event', 'event_data', 'playbook', 'play', 'role', 'task', 'created', 'counter', 'uuid', 'stdout', 'parent_uuid', 'start_line', 'end_line', - 'verbosity' + 'host_id', 'host_name', 'verbosity', ] class Meta: @@ -271,34 +316,53 @@ def get_event_display2(self): def _update_from_event_data(self): # Update event model fields from event data. - updated_fields = set() event_data = self.event_data res = event_data.get('res', None) if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False): self.failed = True - updated_fields.add('failed') if isinstance(res, dict): if res.get('changed', False): self.changed = True - updated_fields.add('changed') if self.event == 'playbook_on_stats': try: failures_dict = event_data.get('failures', {}) dark_dict = event_data.get('dark', {}) self.failed = bool(sum(failures_dict.values()) + sum(dark_dict.values())) - updated_fields.add('failed') changed_dict = event_data.get('changed', {}) self.changed = bool(sum(changed_dict.values())) - updated_fields.add('changed') except (AttributeError, TypeError): pass + + if isinstance(self, JobEvent): + hostnames = self._hostnames() + self._update_host_summary_from_stats(hostnames) + if self.job.inventory: + try: + self.job.inventory.update_computed_fields() + except DatabaseError: + logger.exception('Computed fields database error saving event {}'.format(self.pk)) + + # find parent links and progagate changed=T and failed=T + changed = self.job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa + failed = self.job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa + + JobEvent.objects.filter( + job_id=self.job_id, uuid__in=changed + ).update(changed=True) + JobEvent.objects.filter( + job_id=self.job_id, uuid__in=failed + ).update(failed=True) + for field in ('playbook', 'play', 'task', 'role'): value = force_text(event_data.get(field, '')).strip() if value != getattr(self, field): setattr(self, field, value) - updated_fields.add(field) - return updated_fields + if isinstance(self, JobEvent): + analytics_logger.info( + 'Event data saved.', + extra=dict(python_objects=dict(job_event=self)) + ) @classmethod def create_from_data(cls, **kwargs): @@ -325,74 +389,16 @@ def create_from_data(cls, **kwargs): sanitize_event_keys(kwargs, cls.VALID_KEYS) workflow_job_id = kwargs.pop('workflow_job_id', None) - job_event = cls.objects.create(**kwargs) + event = cls(**kwargs) if workflow_job_id: - setattr(job_event, 'workflow_job_id', workflow_job_id) - analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=job_event))) - return job_event + setattr(event, 'workflow_job_id', workflow_job_id) + event._update_from_event_data() + return event @property def job_verbosity(self): return 0 - def save(self, *args, **kwargs): - # If update_fields has been specified, add our field names to it, - # if it hasn't been specified, then we're just doing a normal save. - update_fields = kwargs.get('update_fields', []) - # Update model fields and related objects unless we're only updating - # failed/changed flags triggered from a child event. - from_parent_update = kwargs.pop('from_parent_update', False) - if not from_parent_update: - # Update model fields from event data. - updated_fields = self._update_from_event_data() - for field in updated_fields: - if field not in update_fields: - update_fields.append(field) - - # Update host related field from host_name. - if hasattr(self, 'job') and not self.host_id and self.host_name: - if self.job.inventory.kind == 'smart': - # optimization to avoid calling inventory.hosts, which - # can take a long time to run under some circumstances - from awx.main.models.inventory import SmartInventoryMembership - membership = SmartInventoryMembership.objects.filter( - inventory=self.job.inventory, host__name=self.host_name - ).first() - if membership: - host_id = membership.host_id - else: - host_id = None - else: - host_qs = self.job.inventory.hosts.filter(name=self.host_name) - host_id = host_qs.only('id').values_list('id', flat=True).first() - if host_id != self.host_id: - self.host_id = host_id - if 'host_id' not in update_fields: - update_fields.append('host_id') - super(BasePlaybookEvent, self).save(*args, **kwargs) - - # Update related objects after this event is saved. - if hasattr(self, 'job') and not from_parent_update: - if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False): - self._update_hosts() - if self.parent_uuid: - kwargs = {} - if self.changed is True: - kwargs['changed'] = True - if self.failed is True: - kwargs['failed'] = True - if kwargs: - JobEvent.objects.filter(job_id=self.job_id, uuid=self.parent_uuid).update(**kwargs) - - if self.event == 'playbook_on_stats': - hostnames = self._hostnames() - self._update_host_summary_from_stats(hostnames) - try: - self.job.inventory.update_computed_fields() - except DatabaseError: - logger.exception('Computed fields database error saving event {}'.format(self.pk)) - - class JobEvent(BasePlaybookEvent): ''' @@ -456,38 +462,6 @@ def get_absolute_url(self, request=None): def __str__(self): return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat()) - def _update_from_event_data(self): - # Update job event hostname - updated_fields = super(JobEvent, self)._update_from_event_data() - value = force_text(self.event_data.get('host', '')).strip() - if value != getattr(self, 'host_name'): - setattr(self, 'host_name', value) - updated_fields.add('host_name') - return updated_fields - - def _update_hosts(self, extra_host_pks=None): - # Update job event hosts m2m from host_name, propagate to parent events. - extra_host_pks = set(extra_host_pks or []) - hostnames = set() - if self.host_name: - hostnames.add(self.host_name) - if self.event == 'playbook_on_stats': - try: - for v in self.event_data.values(): - hostnames.update(v.keys()) - except AttributeError: # In case event_data or v isn't a dict. - pass - qs = self.job.inventory.hosts.all() - qs = qs.filter(models.Q(name__in=hostnames) | models.Q(pk__in=extra_host_pks)) - qs = qs.exclude(job_events__pk=self.id).only('id') - for host in qs: - self.hosts.add(host) - if self.parent_uuid: - parent = JobEvent.objects.filter(uuid=self.parent_uuid) - if parent.exists(): - parent = parent[0] - parent._update_hosts(qs.values_list('id', flat=True)) - def _hostnames(self): hostnames = set() try: @@ -619,14 +593,7 @@ def create_from_data(cls, **kwargs): kwargs.pop('created', None) sanitize_event_keys(kwargs, cls.VALID_KEYS) - kwargs.pop('workflow_job_id', None) - event = cls.objects.create(**kwargs) - if isinstance(event, AdHocCommandEvent): - analytics_logger.info( - 'Event data saved.', - extra=dict(python_objects=dict(job_event=event)) - ) - return event + return cls(**kwargs) def get_event_display(self): ''' @@ -640,6 +607,9 @@ def get_event_display2(self): def get_host_status_counts(self): return create_host_status_counts(getattr(self, 'event_data', {})) + def _update_from_event_data(self): + pass + class AdHocCommandEvent(BaseCommandEvent): @@ -719,34 +689,17 @@ class Meta: def get_absolute_url(self, request=None): return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request) - def save(self, *args, **kwargs): - # If update_fields has been specified, add our field names to it, - # if it hasn't been specified, then we're just doing a normal save. - update_fields = kwargs.get('update_fields', []) + def _update_from_event_data(self): res = self.event_data.get('res', None) if self.event in self.FAILED_EVENTS: if not self.event_data.get('ignore_errors', False): self.failed = True - if 'failed' not in update_fields: - update_fields.append('failed') if isinstance(res, dict) and res.get('changed', False): self.changed = True - if 'changed' not in update_fields: - update_fields.append('changed') - self.host_name = self.event_data.get('host', '').strip() - if 'host_name' not in update_fields: - update_fields.append('host_name') - if not self.host_id and self.host_name: - host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name) - try: - host_id = host_qs.only('id').values_list('id', flat=True) - if host_id.exists(): - self.host_id = host_id[0] - if 'host_id' not in update_fields: - update_fields.append('host_id') - except (IndexError, AttributeError): - pass - super(AdHocCommandEvent, self).save(*args, **kwargs) + analytics_logger.info( + 'Event data saved.', + extra=dict(python_objects=dict(job_event=self)) + ) class InventoryUpdateEvent(BaseCommandEvent): diff --git a/awx/main/signals.py b/awx/main/signals.py index accd00e367c7..8e2c6fe030e0 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -30,12 +30,11 @@ # AWX from awx.main.models import ( - ActivityStream, AdHocCommandEvent, Group, Host, InstanceGroup, Inventory, - InventorySource, InventoryUpdateEvent, Job, JobEvent, JobHostSummary, - JobTemplate, OAuth2AccessToken, Organization, Project, ProjectUpdateEvent, - Role, SystemJob, SystemJobEvent, SystemJobTemplate, UnifiedJob, - UnifiedJobTemplate, User, UserSessionMembership, WorkflowJobTemplateNode, - WorkflowApproval, WorkflowApprovalTemplate, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR + ActivityStream, Group, Host, InstanceGroup, Inventory, InventorySource, + Job, JobHostSummary, JobTemplate, OAuth2AccessToken, Organization, Project, + Role, SystemJob, SystemJobTemplate, UnifiedJob, UnifiedJobTemplate, User, + UserSessionMembership, WorkflowJobTemplateNode, WorkflowApproval, + WorkflowApprovalTemplate, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR ) from awx.main.constants import CENSOR_VALUE from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps @@ -72,42 +71,6 @@ def get_current_user_or_none(): return u -def emit_event_detail(serializer, relation, **kwargs): - instance = kwargs['instance'] - created = kwargs['created'] - if created: - event_serializer = serializer(instance) - consumers.emit_channel_notification( - '-'.join([event_serializer.get_group_name(instance), str(getattr(instance, relation))]), - event_serializer.data - ) - - -def emit_job_event_detail(sender, **kwargs): - from awx.api import serializers - emit_event_detail(serializers.JobEventWebSocketSerializer, 'job_id', **kwargs) - - -def emit_ad_hoc_command_event_detail(sender, **kwargs): - from awx.api import serializers - emit_event_detail(serializers.AdHocCommandEventWebSocketSerializer, 'ad_hoc_command_id', **kwargs) - - -def emit_project_update_event_detail(sender, **kwargs): - from awx.api import serializers - emit_event_detail(serializers.ProjectUpdateEventWebSocketSerializer, 'project_update_id', **kwargs) - - -def emit_inventory_update_event_detail(sender, **kwargs): - from awx.api import serializers - emit_event_detail(serializers.InventoryUpdateEventWebSocketSerializer, 'inventory_update_id', **kwargs) - - -def emit_system_job_event_detail(sender, **kwargs): - from awx.api import serializers - emit_event_detail(serializers.SystemJobEventWebSocketSerializer, 'system_job_id', **kwargs) - - def emit_update_inventory_computed_fields(sender, **kwargs): logger.debug("In update inventory computed fields") if getattr(_inventory_updates, 'is_updating', False): @@ -258,11 +221,6 @@ def connect_computed_field_signals(): post_save.connect(save_related_job_templates, sender=Project) post_save.connect(save_related_job_templates, sender=Inventory) -post_save.connect(emit_job_event_detail, sender=JobEvent) -post_save.connect(emit_ad_hoc_command_event_detail, sender=AdHocCommandEvent) -post_save.connect(emit_project_update_event_detail, sender=ProjectUpdateEvent) -post_save.connect(emit_inventory_update_event_detail, sender=InventoryUpdateEvent) -post_save.connect(emit_system_job_event_detail, sender=SystemJobEvent) m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through) m2m_changed.connect(rbac_activity_stream, Role.members.through) m2m_changed.connect(rbac_activity_stream, Role.parents.through) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 9b263f550dd0..09dc866d3063 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -703,6 +703,7 @@ class BaseTask(object): def __init__(self): self.cleanup_paths = [] self.parent_workflow_job_id = None + self.host_map = {} def update_model(self, pk, _attempt=0, **updates): """Reload the model instance from the database and update the @@ -1001,11 +1002,17 @@ def should_use_proot(self, instance): return False def build_inventory(self, instance, private_data_dir): - script_params = dict(hostvars=True) + script_params = dict(hostvars=True, towervars=True) if hasattr(instance, 'job_slice_number'): script_params['slice_number'] = instance.job_slice_number script_params['slice_count'] = instance.job_slice_count script_data = instance.inventory.get_script_data(**script_params) + # maintain a list of host_name --> host_id + # so we can associate emitted events to Host objects + self.host_map = { + hostname: hv.pop('remote_tower_id', '') + for hostname, hv in script_data.get('_meta', {}).get('hostvars', {}).items() + } json_data = json.dumps(script_data) handle, path = tempfile.mkstemp(dir=private_data_dir) f = os.fdopen(handle, 'w') @@ -1114,6 +1121,15 @@ def event_handler(self, event_data): event_data.pop('parent_uuid', None) if self.parent_workflow_job_id: event_data['workflow_job_id'] = self.parent_workflow_job_id + if self.host_map: + host = event_data.get('event_data', {}).get('host', '').strip() + if host: + event_data['host_name'] = host + if host in self.host_map: + event_data['host_id'] = self.host_map[host] + else: + event_data['host_name'] = '' + event_data['host_id'] = '' should_write_event = False event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) diff --git a/awx/main/tests/functional/api/test_events.py b/awx/main/tests/functional/api/test_events.py index 00b5459eafe4..036183e93d03 100644 --- a/awx/main/tests/functional/api/test_events.py +++ b/awx/main/tests/functional/api/test_events.py @@ -15,7 +15,7 @@ def test_job_events_sublist_truncation(get, organization_factory, job_template_f inventory='test_inv', project='test_proj').job_template job = jt.create_unified_job() JobEvent.create_from_data(job_id=job.pk, uuid='abc123', event='runner_on_start', - stdout='a' * 1025) + stdout='a' * 1025).save() url = reverse('api:job_job_events_list', kwargs={'pk': job.pk}) if not truncate: @@ -35,7 +35,7 @@ def test_ad_hoc_events_sublist_truncation(get, organization_factory, job_templat adhoc = AdHocCommand() adhoc.save() AdHocCommandEvent.create_from_data(ad_hoc_command_id=adhoc.pk, uuid='abc123', event='runner_on_start', - stdout='a' * 1025) + stdout='a' * 1025).save() url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', kwargs={'pk': adhoc.pk}) if not truncate: diff --git a/awx/main/tests/functional/api/test_settings.py b/awx/main/tests/functional/api/test_settings.py index 03d7620883e6..a88aa8c20b9e 100644 --- a/awx/main/tests/functional/api/test_settings.py +++ b/awx/main/tests/functional/api/test_settings.py @@ -5,6 +5,7 @@ # Python import pytest import os +import time from django.conf import settings from kombu.utils.url import parse_url @@ -276,6 +277,7 @@ def test_logging_aggregrator_connection_test_valid(mocker, get, post, admin): def test_logging_aggregrator_connection_test_with_masked_password(mocker, patch, post, admin): url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'logging'}) patch(url, user=admin, data={'LOG_AGGREGATOR_PASSWORD': 'password123'}, expect=200) + time.sleep(1) # log settings are cached slightly with mock.patch.object(AWXProxyHandler, 'perform_test') as perform_test: url = reverse('api:setting_logging_test') diff --git a/awx/main/tests/functional/commands/test_secret_key_regeneration.py b/awx/main/tests/functional/commands/test_secret_key_regeneration.py index 811363ee4778..dffaacb866f8 100644 --- a/awx/main/tests/functional/commands/test_secret_key_regeneration.py +++ b/awx/main/tests/functional/commands/test_secret_key_regeneration.py @@ -37,26 +37,26 @@ def test_encrypted_ssh_password(self, credential): def test_encrypted_setting_values(self): # test basic decryption - settings.LOG_AGGREGATOR_PASSWORD = 'sensitive' - s = Setting.objects.filter(key='LOG_AGGREGATOR_PASSWORD').first() + settings.REDHAT_PASSWORD = 'sensitive' + s = Setting.objects.filter(key='REDHAT_PASSWORD').first() assert s.value.startswith(PREFIX) - assert settings.LOG_AGGREGATOR_PASSWORD == 'sensitive' + assert settings.REDHAT_PASSWORD == 'sensitive' # re-key the setting value new_key = regenerate_secret_key.Command().handle() - new_setting = Setting.objects.filter(key='LOG_AGGREGATOR_PASSWORD').first() + new_setting = Setting.objects.filter(key='REDHAT_PASSWORD').first() assert s.value != new_setting.value # wipe out the local cache so the value is pulled from the DB again - settings.cache.delete('LOG_AGGREGATOR_PASSWORD') + settings.cache.delete('REDHAT_PASSWORD') # verify that the old SECRET_KEY doesn't work with pytest.raises(InvalidToken): - settings.LOG_AGGREGATOR_PASSWORD + settings.REDHAT_PASSWORD # verify that the new SECRET_KEY *does* work with override_settings(SECRET_KEY=new_key): - assert settings.LOG_AGGREGATOR_PASSWORD == 'sensitive' + assert settings.REDHAT_PASSWORD == 'sensitive' def test_encrypted_notification_secrets(self, notification_template_with_encrypt): # test basic decryption diff --git a/awx/main/tests/functional/models/test_events.py b/awx/main/tests/functional/models/test_events.py index d16c60bb6051..a61c3fda2550 100644 --- a/awx/main/tests/functional/models/test_events.py +++ b/awx/main/tests/functional/models/test_events.py @@ -1,18 +1,15 @@ from unittest import mock import pytest -from awx.main.models import (Job, JobEvent, ProjectUpdate, ProjectUpdateEvent, - AdHocCommand, AdHocCommandEvent, InventoryUpdate, - InventorySource, InventoryUpdateEvent, SystemJob, - SystemJobEvent) +from awx.main.models import Job, JobEvent @pytest.mark.django_db -@mock.patch('awx.main.consumers.emit_channel_notification') +@mock.patch('awx.main.models.events.emit_event_detail') def test_parent_changed(emit): j = Job() j.save() - JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start') + JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start').save() assert JobEvent.objects.count() == 1 for e in JobEvent.objects.all(): assert e.changed is False @@ -24,19 +21,26 @@ def test_parent_changed(emit): event_data={ 'res': {'changed': ['localhost']} } - ) - assert JobEvent.objects.count() == 2 - for e in JobEvent.objects.all(): + ).save() + # the `playbook_on_stats` event is where we update the parent changed linkage + JobEvent.create_from_data( + job_id=j.pk, + parent_uuid='abc123', + event='playbook_on_stats' + ).save() + events = JobEvent.objects.filter(event__in=['playbook_on_task_start', 'runner_on_ok']) + assert events.count() == 2 + for e in events.all(): assert e.changed is True @pytest.mark.django_db @pytest.mark.parametrize('event', JobEvent.FAILED_EVENTS) -@mock.patch('awx.main.consumers.emit_channel_notification') +@mock.patch('awx.main.models.events.emit_event_detail') def test_parent_failed(emit, event): j = Job() j.save() - JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start') + JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start').save() assert JobEvent.objects.count() == 1 for e in JobEvent.objects.all(): assert e.failed is False @@ -45,69 +49,15 @@ def test_parent_failed(emit, event): job_id=j.pk, parent_uuid='abc123', event=event - ) - assert JobEvent.objects.count() == 2 - for e in JobEvent.objects.all(): - assert e.failed is True - - -@pytest.mark.django_db -@mock.patch('awx.main.consumers.emit_channel_notification') -def test_job_event_websocket_notifications(emit): - j = Job(id=123) - j.save() - JobEvent.create_from_data(job_id=j.pk) - assert len(emit.call_args_list) == 1 - topic, payload = emit.call_args_list[0][0] - assert topic == 'job_events-123' - assert payload['job'] == 123 - - -@pytest.mark.django_db -@mock.patch('awx.main.consumers.emit_channel_notification') -def test_ad_hoc_event_websocket_notifications(emit): - ahc = AdHocCommand(id=123) - ahc.save() - AdHocCommandEvent.create_from_data(ad_hoc_command_id=ahc.pk) - assert len(emit.call_args_list) == 1 - topic, payload = emit.call_args_list[0][0] - assert topic == 'ad_hoc_command_events-123' - assert payload['ad_hoc_command'] == 123 + ).save() - -@pytest.mark.django_db -@mock.patch('awx.main.consumers.emit_channel_notification') -def test_project_update_event_websocket_notifications(emit, project): - pu = ProjectUpdate(id=123, project=project) - pu.save() - ProjectUpdateEvent.create_from_data(project_update_id=pu.pk) - assert len(emit.call_args_list) == 1 - topic, payload = emit.call_args_list[0][0] - assert topic == 'project_update_events-123' - assert payload['project_update'] == 123 - - -@pytest.mark.django_db -@mock.patch('awx.main.consumers.emit_channel_notification') -def test_inventory_update_event_websocket_notifications(emit, inventory): - source = InventorySource() - source.save() - iu = InventoryUpdate(id=123, inventory_source=source) - iu.save() - InventoryUpdateEvent.create_from_data(inventory_update_id=iu.pk) - assert len(emit.call_args_list) == 1 - topic, payload = emit.call_args_list[0][0] - assert topic == 'inventory_update_events-123' - assert payload['inventory_update'] == 123 - - -@pytest.mark.django_db -@mock.patch('awx.main.consumers.emit_channel_notification') -def test_system_job_event_websocket_notifications(emit, inventory): - j = SystemJob(id=123) - j.save() - SystemJobEvent.create_from_data(system_job_id=j.pk) - assert len(emit.call_args_list) == 1 - topic, payload = emit.call_args_list[0][0] - assert topic == 'system_job_events-123' - assert payload['system_job'] == 123 + # the `playbook_on_stats` event is where we update the parent failed linkage + JobEvent.create_from_data( + job_id=j.pk, + parent_uuid='abc123', + event='playbook_on_stats' + ).save() + events = JobEvent.objects.filter(event__in=['playbook_on_task_start', event]) + assert events.count() == 2 + for e in events.all(): + assert e.failed is True diff --git a/awx/main/tests/unit/commands/test_replay_job_events.py b/awx/main/tests/unit/commands/test_replay_job_events.py index 35042dd840e6..2dab4443adcb 100644 --- a/awx/main/tests/unit/commands/test_replay_job_events.py +++ b/awx/main/tests/unit/commands/test_replay_job_events.py @@ -60,7 +60,7 @@ def replayer(self, mocker, job_events, mock_serializer_fn): r.emit_job_status = lambda job, status: True return r - @mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None) + @mock.patch('awx.main.management.commands.replay_job_events.emit_event_detail', lambda *a, **kw: None) def test_sleep(self, mocker, replayer): replayer.run(3, 1) @@ -74,7 +74,7 @@ def test_sleep(self, mocker, replayer): mock.call(0.000001), ]) - @mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None) + @mock.patch('awx.main.management.commands.replay_job_events.emit_event_detail', lambda *a, **kw: None) def test_speed(self, mocker, replayer): replayer.run(3, 2) diff --git a/awx/main/tests/unit/models/test_events.py b/awx/main/tests/unit/models/test_events.py index 79d23a375719..3b824da242c5 100644 --- a/awx/main/tests/unit/models/test_events.py +++ b/awx/main/tests/unit/models/test_events.py @@ -1,6 +1,5 @@ from datetime import datetime from django.utils.timezone import utc -from unittest import mock import pytest from awx.main.models import (JobEvent, ProjectUpdateEvent, AdHocCommandEvent, @@ -18,16 +17,11 @@ datetime(2018, 1, 1).isoformat(), datetime(2018, 1, 1) ]) def test_event_parse_created(job_identifier, cls, created): - with mock.patch.object(cls, 'objects') as manager: - cls.create_from_data(**{ - job_identifier: 123, - 'created': created - }) - expected_created = datetime(2018, 1, 1).replace(tzinfo=utc) - manager.create.assert_called_with(**{ - job_identifier: 123, - 'created': expected_created - }) + event = cls.create_from_data(**{ + job_identifier: 123, + 'created': created + }) + assert event.created == datetime(2018, 1, 1).replace(tzinfo=utc) @pytest.mark.parametrize('job_identifier, cls', [ @@ -38,24 +32,20 @@ def test_event_parse_created(job_identifier, cls, created): ['system_job_id', SystemJobEvent], ]) def test_playbook_event_strip_invalid_keys(job_identifier, cls): - with mock.patch.object(cls, 'objects') as manager: - cls.create_from_data(**{ - job_identifier: 123, - 'extra_key': 'extra_value' - }) - manager.create.assert_called_with(**{job_identifier: 123}) + event = cls.create_from_data(**{ + job_identifier: 123, + 'extra_key': 'extra_value' + }) + assert getattr(event, job_identifier) == 123 + assert not hasattr(event, 'extra_key') @pytest.mark.parametrize('field', [ 'play', 'role', 'task', 'playbook' ]) def test_really_long_event_fields(field): - with mock.patch.object(JobEvent, 'objects') as manager: - JobEvent.create_from_data(**{ - 'job_id': 123, - 'event_data': {field: 'X' * 4096} - }) - manager.create.assert_called_with(**{ - 'job_id': 123, - 'event_data': {field: 'X' * 1023 + '…'} - }) + event = JobEvent.create_from_data(**{ + 'job_id': 123, + 'event_data': {field: 'X' * 4096} + }) + assert event.event_data[field] == 'X' * 1023 + '…' diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 51436473a14c..a84b9403eed4 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -576,9 +576,6 @@ def IS_TESTING(argv=None): # Additional environment variables to be passed to the ansible subprocesses AWX_TASK_ENV = {} -# Flag to enable/disable updating hosts M2M when saving job events. -CAPTURE_JOB_EVENT_HOSTS = False - # Rebuild Host Smart Inventory memberships. AWX_REBUILD_SMART_MEMBERSHIP = False