From 7b410f4eb32691dcd9ecb540030d6e15859c73f3 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Thu, 9 Jan 2020 10:32:38 -0500 Subject: [PATCH] optimize per-event host lookups and changed/failed propagation lookups we've always performed these (fairly expensive) queries *on every event save* - if you're processing tens of thousands of events in short bursts, this is way too slow see: https://github.com/ansible/awx/issues/5514 --- awx/conf/settings.py | 16 ++++- awx/main/dispatch/worker/callback.py | 8 ++- awx/main/models/events.py | 104 +++++++++------------------ awx/main/signals.py | 8 +-- awx/main/tasks.py | 17 ++++- 5 files changed, 73 insertions(+), 80 deletions(-) diff --git a/awx/conf/settings.py b/awx/conf/settings.py index 350a4df0490b..cdcd78be396e 100644 --- a/awx/conf/settings.py +++ b/awx/conf/settings.py @@ -28,6 +28,8 @@ from awx.conf.models import Setting from awx.conf.migrations._reencrypt import decrypt_field as old_decrypt_field +import cachetools + # FIXME: Gracefully handle when settings are accessed before the database is # ready (or during migrations). @@ -136,6 +138,14 @@ def filter_sensitive(registry, key, value): return value +# settings.__getattr__ is called *constantly*, and the LOGGING_ ones are +# so ubiquitous when external logging is enabled that they should kept in memory +# with a short TTL to avoid even having to contact memcached +# the primary use case for this optimization is the callback receiver +# when external logging is enabled +LOGGING_SETTINGS_CACHE = cachetools.TTLCache(maxsize=50, ttl=5) + + class EncryptedCacheProxy(object): def __init__(self, cache, registry, encrypter=None, decrypter=None): @@ -437,11 +447,15 @@ def SETTINGS_MODULE(self): return self._get_default('SETTINGS_MODULE') def __getattr__(self, name): + if name.startswith('LOG_AGGREGATOR_'): + if name in LOGGING_SETTINGS_CACHE: + return LOGGING_SETTINGS_CACHE[name] value = empty if name in self.all_supported_settings: with _ctit_db_wrapper(trans_safe=True): value = self._get_local(name) - if value is not empty: + if name.startswith('LOG_AGGREGATOR_') and value is not empty: + LOGGING_SETTINGS_CACHE[name] = value return value value = self._get_default(name) # sometimes users specify RabbitMQ passwords that contain diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 551d1ca2a027..18a09de8bd4c 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -133,8 +133,12 @@ def perform_work(self, body): retries = 0 while retries <= self.MAX_RETRIES: try: - event = cls(**cls.create_from_data(**body)) - event.remove_me() + kwargs = cls.create_from_data(**body) + workflow_job_id = kwargs.pop('workflow_job_id', None) + event = cls(**kwargs) + if workflow_job_id: + setattr(event, 'workflow_job_id', workflow_job_id) + event._update_from_event_data() self.buff.setdefault(cls, []).append(event) self.flush() break diff --git a/awx/main/models/events.py b/awx/main/models/events.py index 8a6d13b92ca2..74fbc6077b04 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -63,7 +63,7 @@ class BasePlaybookEvent(CreatedModifiedModel): VALID_KEYS = [ 'event', 'event_data', 'playbook', 'play', 'role', 'task', 'created', 'counter', 'uuid', 'stdout', 'parent_uuid', 'start_line', 'end_line', - 'verbosity' + 'host_id', 'host_name', 'verbosity', ] class Meta: @@ -288,10 +288,35 @@ def _update_from_event_data(self): self.changed = bool(sum(changed_dict.values())) except (AttributeError, TypeError): pass + + if isinstance(self, JobEvent): + hostnames = self._hostnames() + self._update_host_summary_from_stats(hostnames) + try: + self.job.inventory.update_computed_fields() + except DatabaseError: + logger.exception('Computed fields database error saving event {}'.format(self.pk)) + + # find parent links and progagate changed=T and failed=T + changed = self.job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa + failed = self.job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa + + JobEvent.objects.filter( + job_id=self.job_id, uuid__in=changed + ).update(changed=True) + JobEvent.objects.filter( + job_id=self.job_id, uuid__in=failed + ).update(failed=True) + for field in ('playbook', 'play', 'task', 'role'): value = force_text(event_data.get(field, '')).strip() if value != getattr(self, field): setattr(self, field, value) + if isinstance(self, JobEvent): + analytics_logger.info( + 'Event data saved.', + extra=dict(python_objects=dict(job_event=self)) + ) @classmethod def create_from_data(cls, **kwargs): @@ -317,58 +342,12 @@ def create_from_data(cls, **kwargs): kwargs.pop('created', None) sanitize_event_keys(kwargs, cls.VALID_KEYS) - workflow_job_id = kwargs.pop('workflow_job_id', None) return kwargs - job_event = cls.objects.create(**kwargs) - if workflow_job_id: - setattr(job_event, 'workflow_job_id', workflow_job_id) - analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=job_event))) - return job_event @property def job_verbosity(self): return 0 - def remove_me(self, *args, **kwargs): - # Update model fields from event data. - self._update_from_event_data() - # Update host related field from host_name. - if hasattr(self, 'job') and not self.host_id and self.host_name: - if self.job.inventory.kind == 'smart': - # optimization to avoid calling inventory.hosts, which - # can take a long time to run under some circumstances - from awx.main.models.inventory import SmartInventoryMembership - membership = SmartInventoryMembership.objects.filter( - inventory=self.job.inventory, host__name=self.host_name - ).first() - if membership: - host_id = membership.host_id - else: - host_id = None - else: - host_qs = self.job.inventory.hosts.filter(name=self.host_name) - host_id = host_qs.only('id').values_list('id', flat=True).first() - if host_id != self.host_id: - self.host_id = host_id - - if self.parent_uuid: - kwargs = {} - if self.changed is True: - kwargs['changed'] = True - if self.failed is True: - kwargs['failed'] = True - if kwargs: - JobEvent.objects.filter(job_id=self.job_id, uuid=self.parent_uuid).update(**kwargs) - - if self.event == 'playbook_on_stats': - hostnames = self._hostnames() - self._update_host_summary_from_stats(hostnames) - try: - self.job.inventory.update_computed_fields() - except DatabaseError: - logger.exception('Computed fields database error saving event {}'.format(self.pk)) - - class JobEvent(BasePlaybookEvent): ''' @@ -432,13 +411,6 @@ def get_absolute_url(self, request=None): def __str__(self): return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat()) - def _update_from_event_data(self): - # Update job event hostname - super(JobEvent, self)._update_from_event_data() - value = force_text(self.event_data.get('host', '')).strip() - if value != getattr(self, 'host_name'): - setattr(self, 'host_name', value) - def _hostnames(self): hostnames = set() try: @@ -570,14 +542,7 @@ def create_from_data(cls, **kwargs): kwargs.pop('created', None) sanitize_event_keys(kwargs, cls.VALID_KEYS) - kwargs.pop('workflow_job_id', None) - event = cls.objects.create(**kwargs) - if isinstance(event, AdHocCommandEvent): - analytics_logger.info( - 'Event data saved.', - extra=dict(python_objects=dict(job_event=event)) - ) - return event + return kwargs def get_event_display(self): ''' @@ -670,22 +635,17 @@ class Meta: def get_absolute_url(self, request=None): return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request) - def remove_me(self, *args, **kwargs): + def _update_from_event_data(self): res = self.event_data.get('res', None) if self.event in self.FAILED_EVENTS: if not self.event_data.get('ignore_errors', False): self.failed = True if isinstance(res, dict) and res.get('changed', False): self.changed = True - self.host_name = self.event_data.get('host', '').strip() - if not self.host_id and self.host_name: - host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name) - try: - host_id = host_qs.only('id').values_list('id', flat=True) - if host_id.exists(): - self.host_id = host_id[0] - except (IndexError, AttributeError): - pass + analytics_logger.info( + 'Event data saved.', + extra=dict(python_objects=dict(job_event=self)) + ) class InventoryUpdateEvent(BaseCommandEvent): diff --git a/awx/main/signals.py b/awx/main/signals.py index 16cf59869d6d..4daedf0de989 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -100,10 +100,10 @@ def emit_event_detail(cls, relation, **kwargs): 'event': instance.event, 'failed': instance.failed, 'changed': instance.changed, - 'event_level': instance.event_level, - 'play': instance.play, - 'role': instance.role, - 'task': instance.task + 'event_level': getattr(instance, 'event_level', ''), + 'play': getattr(instance, 'play', ''), + 'role': getattr(instance, 'role', ''), + 'task': getattr(instance, 'task', ''), } ) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 9b263f550dd0..2d0ebaba3d85 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -703,6 +703,7 @@ class BaseTask(object): def __init__(self): self.cleanup_paths = [] self.parent_workflow_job_id = None + self.host_map = {} def update_model(self, pk, _attempt=0, **updates): """Reload the model instance from the database and update the @@ -1001,11 +1002,17 @@ def should_use_proot(self, instance): return False def build_inventory(self, instance, private_data_dir): - script_params = dict(hostvars=True) + script_params = dict(hostvars=True, towervars=True) if hasattr(instance, 'job_slice_number'): script_params['slice_number'] = instance.job_slice_number script_params['slice_count'] = instance.job_slice_count script_data = instance.inventory.get_script_data(**script_params) + # maintain a list of host_name --> host_id + # so we can associate emitted events to Host objects + self.host_map = { + hostname: hv['remote_tower_id'] + for hostname, hv in script_data.get('_meta', {}).get('hostvars', {}).items() + } json_data = json.dumps(script_data) handle, path = tempfile.mkstemp(dir=private_data_dir) f = os.fdopen(handle, 'w') @@ -1114,6 +1121,14 @@ def event_handler(self, event_data): event_data.pop('parent_uuid', None) if self.parent_workflow_job_id: event_data['workflow_job_id'] = self.parent_workflow_job_id + if self.host_map: + host = event_data.get('event_data', {}).get('host', '').strip() + if host: + event_data['host_name'] = host + event_data['host_id'] = self.host_map[host] + else: + event_data['host_name'] = '' + event_data['host_id'] = '' should_write_event = False event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data)