-
-
Notifications
You must be signed in to change notification settings - Fork 726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dont start http server in Scheduler.__init__ #4928
base: main
Are you sure you want to change the base?
Conversation
@@ -422,7 +422,7 @@ async def handle_comm(self, comm): | |||
|
|||
logger.debug("Connection from %r to %s", address, type(self).__name__) | |||
self._comms[comm] = op | |||
await self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very interesting change. I encountered race condition where this await
would actually trigger the warning after I moved the HTTP server startup code to Scheduler.start
. Apparently the start is not idempotent and sometimes we'd restart / start a scheduler twice but only under some strange timing sensitive conditions (I believe the handler was already active while the scheduler is still starting...). I can dig deeper here but I figured there shouldn't be a reason to await self
here since this handler should only be registered after/during the start anyhow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My hope would be that await self
would be idempotent. If in the future we find that there is an easy way to make this true I would be in support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd assume a if status==running; return
would already do the trick. If we prefer this route, I can revert the await self
thingy here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That, combined with using an async lock to avoid many await self
calls at the same time. I think that we do this in other situations, like Client and Cluster. I'm surprised that we don't do it in Server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idempotence you are referring to is implemented once for Server
, see
distributed/distributed/core.py
Lines 264 to 269 in d9bc3c6
def __await__(self): | |
async def _(): | |
timeout = getattr(self, "death_timeout", 0) | |
async with self._startup_lock: | |
if self.status == Status.running: | |
return self |
for a reason I haven't understood, yet, this still caused issues for me. my suspicion is that the scheduler already arrived in a non running but not properly closed state when I see this message. This would then try to revive a kind of dead scheduler and cause this warning. I'll let CI run on this a few times and see if I can reproduce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remembered running into this await before. I had a similar problem over in #4734
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, it turns out a properly timed connection request to a dead or dying scheduler can revive it. hrhr, I guess this is merely an edge case relevant to our async test suite. Regardless, imho this is much more cleanly fixed in #4734 and I suggest to merge that one before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note after I became a bit smarter: This await is here to ensure that the server is actually up. problem is that the start is not idempotent, stumbled over this in #4734 as well
@@ -5393,7 +5405,7 @@ async def gather(self, comm=None, keys=None, serializers=None): | |||
def clear_task_state(self): | |||
# XXX what about nested state such as ClientState.wants_what | |||
# (see also fire-and-forget...) | |||
logger.info("Clear task state") | |||
logger.debug("Clear task state") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this log level since it felt kind of wrong. This prints a Clear task state
to the console whenever I call dask-scheduler
. That's currently failing the tests. I assume this is OK?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I don't have a strong opinion on this
f28e187
to
284e45c
Compare
@@ -45,6 +45,9 @@ parentdir_prefix = distributed- | |||
addopts = -v -rsxfE --durations=20 | |||
filterwarnings = | |||
error:Since distributed.*:PendingDeprecationWarning | |||
|
|||
# See https://github.com/dask/distributed/issues/4806 | |||
error:Port:UserWarning:distributed.node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a bit too optimistic but let's wait and see. It may definitely interfere if the code base is ran using pytest-xdist
. We will always get port collisions when running in parallel
) as cluster: | ||
|
||
# This will never work but is a reliable way to block without hard | ||
# coding any sleep values | ||
async with Client(cluster) as c: | ||
async with Client(cluster, asynchronous=True) as c: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch
try: | ||
import bokeh # noqa: F401 | ||
|
||
HAS_BOKEH = True | ||
except ImportError: | ||
HAS_BOKEH = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: This is subjective and not worth spending much time on, but more commonly throughout the codebase we will catch the ImportError
and assign None
to the module instead of defining a new HAS_*
variable. For example:
distributed/distributed/utils.py
Lines 33 to 36 in 7d0f010
try: | |
import resource | |
except ImportError: | |
resource = None |
Then later on we would do if bokeh is not None
instead of if HAS_BOKEH
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do this in other places already, I'll change it. I think keeping a consistent style in a codebase is worth the effort
@@ -5393,7 +5405,7 @@ async def gather(self, comm=None, keys=None, serializers=None): | |||
def clear_task_state(self): | |||
# XXX what about nested state such as ClientState.wants_what | |||
# (see also fire-and-forget...) | |||
logger.info("Clear task state") | |||
logger.debug("Clear task state") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I don't have a strong opinion on this
@@ -225,3 +227,27 @@ def test_tls_cluster(tls_client): | |||
async def test_tls_scheduler(security, cleanup): | |||
async with Scheduler(security=security, host="localhost") as s: | |||
assert s.address.startswith("tls") | |||
|
|||
|
|||
from distributed.core import Status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Can you move this import to the top of the module with existing distributed.core
import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I'm somehow used for either linters or formatters to make me aware of this. for some reason our config considers this perfectly fine 🤷♂️
kwargs = self.kwargs.copy() | ||
kwargs.pop("dashboard_address") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would silently ignore any dashboard_address
which has been specified in a test. Since this is only ever used in one place, perhaps we should just specify dashboard_address=":0"
there instead? This isn't worth spending too much time on, it was just slightly surprising to see us manually setting dashboard_address=":54321"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think I changed this before I thought about the idea to simply use a random port which ignores the warning. I'll fix that
kwargs = self.kwargs.copy() | ||
kwargs.pop("dashboard_address") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment here
I'm curious, what is the status here? |
Status here is a bit awkward. I needed/wanted to remove the Options:
I think today I will try 2. and if that fails we'll go for 1. since having no warnings in most of the runs is still a win and we can follow up once #4734 is properly resolved |
284e45c
to
d6e9b99
Compare
This should deal with the CI port already in use warning situation reported in #4806
Highlights
Scheduler.__init__
which made some tests really awkward. This PR moves the start of the HTTP server to theScheduler.start
method.Supersedes #4896 and #4921
black distributed
/flake8 distributed
/isort distributed