Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

heavily optimize the write speed of the callback receiver #5618

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 0 additions & 100 deletions awx/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3874,26 +3874,6 @@ def to_representation(self, obj):
return data


class JobEventWebSocketSerializer(JobEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = JobEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'job_events'


class ProjectUpdateEventSerializer(JobEventSerializer):
stdout = serializers.SerializerMethodField()
event_data = serializers.SerializerMethodField()
Expand Down Expand Up @@ -3925,26 +3905,6 @@ def get_event_data(self, obj):
return {}


class ProjectUpdateEventWebSocketSerializer(ProjectUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = ProjectUpdateEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'project_update_events'


class AdHocCommandEventSerializer(BaseSerializer):

event_display = serializers.CharField(source='get_event_display', read_only=True)
Expand Down Expand Up @@ -3976,26 +3936,6 @@ def to_representation(self, obj):
return data


class AdHocCommandEventWebSocketSerializer(AdHocCommandEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = AdHocCommandEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'ad_hoc_command_events'


class InventoryUpdateEventSerializer(AdHocCommandEventSerializer):

class Meta:
Expand All @@ -4011,26 +3951,6 @@ def get_related(self, obj):
return res


class InventoryUpdateEventWebSocketSerializer(InventoryUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = InventoryUpdateEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'inventory_update_events'


class SystemJobEventSerializer(AdHocCommandEventSerializer):

class Meta:
Expand All @@ -4046,26 +3966,6 @@ def get_related(self, obj):
return res


class SystemJobEventWebSocketSerializer(SystemJobEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()

class Meta:
model = SystemJobEvent
fields = ('*', 'event_name', 'group_name',)

def get_created(self, obj):
return obj.created.isoformat()

def get_modified(self, obj):
return obj.modified.isoformat()

def get_group_name(self, obj):
return 'system_job_events'


class JobLaunchSerializer(BaseSerializer):

# Representational fields
Expand Down
16 changes: 16 additions & 0 deletions awx/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from awx.conf.models import Setting
from awx.conf.migrations._reencrypt import decrypt_field as old_decrypt_field

import cachetools

# FIXME: Gracefully handle when settings are accessed before the database is
# ready (or during migrations).

Expand Down Expand Up @@ -136,6 +138,14 @@ def filter_sensitive(registry, key, value):
return value


# settings.__getattr__ is called *constantly*, and the LOG_AGGREGATOR_ ones are
# so ubiquitous when external logging is enabled that they should kept in memory
# with a short TTL to avoid even having to contact memcached
# the primary use case for this optimization is the callback receiver
# when external logging is enabled
LOGGING_SETTINGS_CACHE = cachetools.TTLCache(maxsize=50, ttl=1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AlanCoding I decided to change this back to 1 second, because I'm still seeing drastic speed improvements with 1 second, and it's likely to much less noticeable for users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, no need to document that



class EncryptedCacheProxy(object):

def __init__(self, cache, registry, encrypter=None, decrypter=None):
Expand Down Expand Up @@ -437,11 +447,17 @@ def SETTINGS_MODULE(self):
return self._get_default('SETTINGS_MODULE')

def __getattr__(self, name):
if name.startswith('LOG_AGGREGATOR_'):
Copy link
Contributor Author

@ryanpetrello ryanpetrello Jan 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @AlanCoding I think this is going to help a lot with performance across the board when external logging is enabled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is going to help a lot with performance across the board when external logging is enabled

It will help when external logging is not enabled. When we replaced service restarts with more frequent hits to memcache, this included tests for enablement.

I'm wondering if there is anything we need to document this. And I think the answer is yes, but not much.

awx/awx/main/conf.py

Lines 701 to 709 in c33d2a1

register(
'LOG_AGGREGATOR_ENABLED',
field_class=fields.BooleanField,
default=False,
label=_('Enable External Logging'),
help_text=_('Enable sending logs to external log aggregator.'),
category=_('Logging'),
category_slug='logging',
)

Within this setting help text, I would suggest

Enable sending logs to external log aggregator. This will take a minimum of 10 seconds to take effect, as will any other change to logging settings.

I think that this per-process TTL could be rolled out as a parameter to settings definitions generally. In most cases, it probably still makes sense to not cache it locally like this, but there are still many cases where we check a value multiple times within the same request / operation, which is never a thing we should want to do.

Copy link
Contributor Author

@ryanpetrello ryanpetrello Jan 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but it yields even more of a boost when logging is enabled.

I've set it to 5 seconds, so I question whether we need to do much to actually tell people about this, especially since the old behavior in prior releases was that we restarted actual processes. I think it's reasonable that it takes a few seconds for logs to show up.

I expect it takes most mortals at least this long to switch the context of browser tabs and log in to/pull up logstash/Splunk, especially if you consider this enablement is probably something you ideally set up and get working once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cached = LOGGING_SETTINGS_CACHE.get(name)
if cached:
return cached
value = empty
if name in self.all_supported_settings:
with _ctit_db_wrapper(trans_safe=True):
value = self._get_local(name)
if value is not empty:
if name.startswith('LOG_AGGREGATOR_'):
LOGGING_SETTINGS_CACHE[name] = value
return value
value = self._get_default(name)
# sometimes users specify RabbitMQ passwords that contain
Expand Down
2 changes: 1 addition & 1 deletion awx/main/dispatch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def write(self, preferred_queue, body):
logger.warn("could not write to queue %s" % preferred_queue)
logger.warn("detail: {}".format(tb))
write_attempt_order.append(preferred_queue)
logger.warn("could not write payload to any queue, attempted order: {}".format(write_attempt_order))
logger.error("could not write payload to any queue, attempted order: {}".format(write_attempt_order))
return None

def stop(self, signum):
Expand Down
5 changes: 4 additions & 1 deletion awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def stop(self, signum, frame):

class BaseWorker(object):

def read(self, queue):
return queue.get(block=True, timeout=1)

def work_loop(self, queue, finished, idx, *args):
ppid = os.getppid()
signal_handler = WorkerSignalHandler()
Expand All @@ -128,7 +131,7 @@ def work_loop(self, queue, finished, idx, *args):
if os.getppid() != ppid:
break
try:
body = queue.get(block=True, timeout=1)
body = self.read(queue)

This comment was marked as resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, looks like this change is just to accommodate the pattern in the callback-specific worker code

Copy link
Contributor Author

@ryanpetrello ryanpetrello Jan 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this is just to persist the behavior for the base case (the dispatcher).

if body == 'QUIT':
break
except QueueEmpty:
Expand Down
Loading