-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
remove multiprocessing.Queue usage from the callback receiver #8191
remove multiprocessing.Queue usage from the callback receiver #8191
Conversation
Build failed.
|
c6ef967
to
4bce21b
Compare
Build succeeded.
|
a962d9f
to
ad8b955
Compare
@@ -123,6 +163,8 @@ def perform_work(self, body): | |||
job_identifier = body[key] | |||
break | |||
|
|||
self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This leaves me wanting more. Having a snapshot of the last job event processed is interesting but I suppose I want a log of all events processed. Not saying this is the place for that. But maybe this could be more of a summary. Last 10-20 processed?
Build succeeded.
|
res = self.redis.blpop(self.queues) | ||
time_to_sleep = 1 | ||
res = json.loads(res[1]) | ||
self.process_task(res) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this diff, process_task
still exists and is being used, apparently by the postgres consumer. This method writes to the multiprocessing queue.
So let me echo my understanding - you are removing use of the multiprocessing queue only for the redis-connected callback receiver. However, other messages, such as "delete this inventory" or "start running this job" are sent through the postgres messaging system, and still go through the dispatcher's multiprocessing queue. So one node in the cluster may have 4 or 8 redis connections, but 1 connection to the postgres message bus.
I'm just trying to state facts, and hopefully get the right. I don't recall any evidence that the main dispatcher ever had significant buildup of messages in the queue. Perhaps this is because autoscaling is enabled for the ordinary dispatcher but not for the callback receiver (going by my memory, it may be the other way around).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you've got it right.
So under this new model, process_task
, and the model where we utilize a multiprocessing.Queue
to dispatch messages to the worker processes is really only used but the dispatcher. The callback receiver still has a "main process", but it only exists to fork its children and hang out until they exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't recall any evidence that the main dispatcher ever had significant buildup of messages in the queue. Perhaps this is because autoscaling is enabled for the ordinary dispatcher but not for the callback receiver.
This is mostly because the volume is way lower - you might be running hundreds (or maybe thousands) of parallel jobs/tasks, but it's not uncommon for the callback receiver to deal in far more events in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The greatest volume we had there was the computed fields task. That did fail things in fun ways. I think those tended to be related to the autoscaling, memory, database connection limit, and so on. So filling up that IPC queue didn't tend to come up much, and computed fields should be muzzled now anyway. So I agree with your assessment about volume.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, the dispatcher, unlike the callback receiver, can detect when all the workers are busy, and can autoscale up new workers to avoid a backlog in the IPC queues. There is a limit to this, of course, but on well-provisioned hardware and clusters, it's difficult to hit (we're talking hundreds to thousands of tasks).
awx/main/dispatch/worker/callback.py
Outdated
def record_statistics(self): | ||
if time.time() - self.last_stats > 1: # buffer stat recording to once per second | ||
try: | ||
self.redis.set(f'awx_callback_receiver_statistics_{os.getpid()}', self.debug()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a little non-ideal. The fraction of the time that a human is actively watching the --status
command is minuscule. I don't think it's particularly expensive, maybe like 2e-4 seconds (per every second), but it's adding to the overall background noise for messages that will never be read. I don't have an alternative idea that's not ugly, so it's just a passing thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it does have a cost. That said, I think it's a drop in the bucket compared to other bottlenecks in event processing like (for example), the overhead of the Django ORM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How often are workers recycled, and do the keys get cleared? Should this line pass kwarg ex=~1
?
https://redis-py.readthedocs.io/en/stable/_modules/redis/client.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Callback workers don't get recycled; they exist until the process exits.
At process startup time, I unset all of these keys to wipe out the stats from the previous workers:
https://github.com/ansible/awx/pull/8191/files#diff-f37b92a11438678a6a32ac23a7790f05R54
ad8b955
to
0247c69
Compare
Build succeeded.
|
0247c69
to
4143064
Compare
Build succeeded.
|
Some more metrics on the current version of this PR, single node install, [root@ip-10-0-15-214 ec2-user]# awx-manage print_settings JOB_EVENT_WORKERS
JOB_EVENT_WORKERS = 16
[root@ip-10-0-15-214 ec2-user]# awx-manage print_settings UI_LIVE_UPDATES_ENABLED
UI_LIVE_UPDATES_ENABLED = False [root@ip-10-0-15-214 ec2-user]# awx-manage callback_stats
main_jobevent
↳ last minute 1835721
main_inventoryupdateevent
↳ last minute 0
main_projectupdateevent
↳ last minute 0
main_adhoccommandevent
↳ last minute 0 Obviously your mileage will vary depending on the actual size of your stdout and other parallel work, and things like database IOPS; these numbers represent "best case" performance in a single-node system that otherwise has no other load or CPU contention. Turning on external logging generally adds a 20-30% cost due to the all of the string munging/formatting we do in Python, and the overhead of cpython's This could probably be made slightly faster by throwing more IOPs at a separate distinct database VM, but at this point we're coming up against the overhead of the Django ORM and its |
Build succeeded.
|
awx/main/dispatch/worker/callback.py
Outdated
@@ -26,7 +30,7 @@ | |||
|
|||
# the number of seconds to buffer events in memory before flushing | |||
# using JobEvent.objects.bulk_create() | |||
BUFFER_SECONDS = .1 | |||
BUFFER_SECONDS = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put in settings/defaults.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
self.pid = os.getpid() | ||
self.redis = redis.Redis.from_url(settings.BROKER_URL) | ||
for key in self.redis.keys('awx_callback_receiver_statistics_*'): | ||
self.redis.delete(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could race other callback workers on init and cause a traceback and would then get re-spawned? Should at least try except this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key miss on delete don't raise a traceback:
In [4]: r.delete('foo')
Out[4]: 0
awx/main/dispatch/worker/callback.py
Outdated
return {'event': 'FLUSH'} | ||
|
||
def record_statistics(self): | ||
if time.time() - self.last_stats > 5: # buffer stat recording to once per 5s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make the 5
a setting in settings/defaults.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
awx/main/dispatch/worker/callback.py
Outdated
@property | ||
def mb(self): | ||
return '{:0.3f}'.format( | ||
psutil.Process(os.getpid()).memory_info().rss / 1024.0 / 1024.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace os.getpid() with self.pid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
instead, just have each worker connect directly to redis this has a few benefits: - it's simpler to explain and debug - back pressure on the queue keeps messages around in redis (which is observable, and survives the restart of Python processes) - it's likely notably more performant at high loads
06c2055
to
cd0b9de
Compare
Build succeeded.
|
Build succeeded (gate pipeline).
|
instead, just have each worker connect directly to redis
this has a few benefits:
observable, and survives the restart of Python processes)