-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure test_scheduler does not leak open sockets #4921
Conversation
with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}): | ||
s = Scheduler() | ||
assert s.blocked_handlers == ["test-handler"] | ||
await s.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}): | |
s = Scheduler() | |
assert s.blocked_handlers == ["test-handler"] | |
await s.close() | |
with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}): | |
async with Scheduler() as s: | |
assert s.blocked_handlers == ["test-handler"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the comment earlier in this test, it looks like we want to check that Scheduler.__init__
handles blocking handlers. Using async with Scheduler() as s:
will also call start
which makes it difficult to isolate Scheduler.__init__
behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
879ab37
to
e129015
Compare
@@ -45,6 +45,9 @@ parentdir_prefix = distributed- | |||
addopts = -v -rsxfE --durations=20 | |||
filterwarnings = | |||
error:Since distributed.*:PendingDeprecationWarning | |||
|
|||
# See https://github.com/dask/distributed/issues/4806 | |||
error:Port:UserWarning:distributed.node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would raise and fail the tests whenever the warning is raised during CI. AFAIU, we should not hit this condition accidentally if we properly close all servers properly.
try: | ||
await self._correct_state() | ||
if self.workers: | ||
await asyncio.wait( | ||
list(self.workers.values()) | ||
) # maybe there are more | ||
return self | ||
except Exception: | ||
await self.scheduler.close() | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new and was also a cause for this warning message. If an exception during worker startup would fail, the scheduler was not cleaned up properly. This should've been implemented regardless of the warning, I believe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -1406,6 +1416,7 @@ def test_deque_handler(): | |||
msg = deque_handler.deque[-1] | |||
assert "distributed.scheduler" in deque_handler.format(msg) | |||
assert any(msg.msg == "foo123" for msg in deque_handler.deque) | |||
await s.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably use async with Scheduler() as s
here?
closing this in favour of #4928 |
Poor mans fix for #4806 by manually closing the scheduler in the affected instances. Haven't added a test since testing this either requires root (getting all socks of a proc) or very brittle hard coding of ports.