Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests to use a random port for the dashboard #5060

Merged
merged 17 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@


def test_defaults(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as proc:
with popen(["dask-scheduler"]):

async def f():
# Default behaviour is to listen on all addresses
await assert_can_connect_from_everywhere_4_6(8786, timeout=5.0)

with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c:
with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c:
c.sync(f)

response = requests.get("http://127.0.0.1:8787/status/")
assert response.status_code == 404

with pytest.raises(Exception):
response = requests.get("http://127.0.0.1:9786/info.json")
response.raise_for_status()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

404 is tested by test_no_dashboard below



def test_hostport(loop):
Expand All @@ -55,9 +52,8 @@ async def f():


def test_no_dashboard(loop):
pytest.importorskip("bokeh")
with popen(["dask-scheduler", "--no-dashboard"]) as proc:
with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c:
with popen(["dask-scheduler", "--no-dashboard"]):
with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
response = requests.get("http://127.0.0.1:8787/status/")
assert response.status_code == 404

Expand Down
14 changes: 7 additions & 7 deletions distributed/cli/tests/test_dask_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

from distributed import Client
from distributed.scheduler import COMPILED
from distributed.utils_test import popen
from distributed.utils_test import gen_test, popen


@pytest.mark.skipif(COMPILED, reason="Fails with cythonized scheduler")
@pytest.mark.asyncio
async def test_text(cleanup):
@gen_test(timeout=120)
async def test_text():
with popen(
[
sys.executable,
Expand All @@ -19,7 +19,7 @@ async def test_text(cleanup):
"--spec",
'{"cls": "dask.distributed.Scheduler", "opts": {"port": 9373}}',
]
) as sched:
):
with popen(
[
sys.executable,
Expand All @@ -29,7 +29,7 @@ async def test_text(cleanup):
"--spec",
'{"cls": "dask.distributed.Worker", "opts": {"nanny": false, "nthreads": 3, "name": "foo"}}',
]
) as w:
):
async with Client("tcp://localhost:9373", asynchronous=True) as client:
await client.wait_for_workers(1)
info = await client.scheduler.identity()
Expand All @@ -51,7 +51,7 @@ async def test_file(cleanup, tmp_path):
f,
)

with popen(["dask-scheduler", "--port", "9373", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--port", "9373", "--no-dashboard"]):
with popen(
[
sys.executable,
Expand All @@ -61,7 +61,7 @@ async def test_file(cleanup, tmp_path):
"--spec-file",
fn,
]
) as w:
):
async with Client("tcp://localhost:9373", asynchronous=True) as client:
await client.wait_for_workers(1)
info = await client.scheduler.identity()
Expand Down
84 changes: 40 additions & 44 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils import parse_ports, sync, tmpfile
from distributed.utils_test import popen, terminate_process, wait_for_port
from distributed.utils_test import gen_cluster, popen, terminate_process, wait_for_port


def test_nanny_worker_ports(loop):
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]):
with popen(
[
"dask-worker",
Expand All @@ -34,7 +34,7 @@ def test_nanny_worker_ports(loop):
"5273",
"--no-dashboard",
]
) as worker:
):
with Client("127.0.0.1:9359", loop=loop) as c:
start = time()
while True:
Expand All @@ -50,6 +50,7 @@ def test_nanny_worker_ports(loop):
)


@pytest.mark.slow
def test_nanny_worker_port_range(loop):
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched:
nprocs = 3
Expand All @@ -69,7 +70,7 @@ def test_nanny_worker_port_range(loop):
nanny_port,
"--no-dashboard",
]
) as worker:
):
with Client("127.0.0.1:9359", loop=loop) as c:
start = time()
while len(c.scheduler_info()["workers"]) < nprocs:
Expand All @@ -89,7 +90,7 @@ def get_port(dask_worker):


def test_nanny_worker_port_range_too_many_workers_raises(loop):
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]):
with popen(
[
"dask-worker",
Expand All @@ -111,7 +112,7 @@ def test_nanny_worker_port_range_too_many_workers_raises(loop):


def test_memory_limit(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(
[
"dask-worker",
Expand All @@ -120,7 +121,7 @@ def test_memory_limit(loop):
"2e3MB",
"--no-dashboard",
]
) as worker:
):
with Client("127.0.0.1:8786", loop=loop) as c:
while not c.nthreads():
sleep(0.1)
Expand All @@ -131,7 +132,7 @@ def test_memory_limit(loop):


def test_no_nanny(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(
["dask-worker", "127.0.0.1:8786", "--no-nanny", "--no-dashboard"]
) as worker:
Expand All @@ -157,11 +158,11 @@ def test_no_reconnect(nanny, loop):
start = time()
while worker.poll() is None:
sleep(0.1)
assert time() < start + 10
assert time() < start + 30


def test_resources(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(
[
"dask-worker",
Expand All @@ -170,7 +171,7 @@ def test_resources(loop):
"--resources",
"A=1 B=2,C=3",
]
) as worker:
):
with Client("127.0.0.1:8786", loop=loop) as c:
while not c.scheduler_info()["workers"]:
sleep(0.1)
Expand All @@ -182,7 +183,7 @@ def test_resources(loop):
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_local_directory(loop, nanny):
with tmpfile() as fn:
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(
[
"dask-worker",
Expand All @@ -192,7 +193,7 @@ def test_local_directory(loop, nanny):
"--local-directory",
fn,
]
) as worker:
):
with Client("127.0.0.1:8786", loop=loop, timeout=10) as c:
start = time()
while not c.scheduler_info()["workers"]:
Expand All @@ -206,9 +207,7 @@ def test_local_directory(loop, nanny):
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_scheduler_file(loop, nanny):
with tmpfile() as fn:
with popen(
["dask-scheduler", "--no-dashboard", "--scheduler-file", fn]
) as sched:
with popen(["dask-scheduler", "--no-dashboard", "--scheduler-file", fn]):
with popen(
["dask-worker", "--scheduler-file", fn, nanny, "--no-dashboard"]
):
Expand All @@ -221,7 +220,7 @@ def test_scheduler_file(loop, nanny):

def test_scheduler_address_env(loop, monkeypatch):
monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", "tcp://127.0.0.1:8786")
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(["dask-worker", "--no-dashboard"]):
with Client(os.environ["DASK_SCHEDULER_ADDRESS"], loop=loop) as c:
start = time()
Expand All @@ -231,7 +230,7 @@ def test_scheduler_address_env(loop, monkeypatch):


def test_nprocs_requires_nanny(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(
["dask-worker", "127.0.0.1:8786", "--nprocs=2", "--no-nanny"]
) as worker:
Expand All @@ -242,31 +241,29 @@ def test_nprocs_requires_nanny(loop):


def test_nprocs_negative(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=-1"]) as worker:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=-1"]):
with Client("tcp://127.0.0.1:8786", loop=loop) as c:
c.wait_for_workers(cpu_count(), timeout="10 seconds")


def test_nprocs_auto(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=auto"]) as worker:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=auto"]):
with Client("tcp://127.0.0.1:8786", loop=loop) as c:
procs, _ = nprocesses_nthreads()
c.wait_for_workers(procs, timeout="10 seconds")


def test_nprocs_expands_name(loop):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(
["dask-worker", "127.0.0.1:8786", "--nprocs", "2", "--name", "0"]
) as worker:
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2"]) as worker:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2", "--name", "0"]):
with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2"]):
with Client("tcp://127.0.0.1:8786", loop=loop) as c:
start = time()
while len(c.scheduler_info()["workers"]) < 4:
sleep(0.2)
assert time() < start + 10
assert time() < start + 30

info = c.scheduler_info()
names = [d["name"] for d in info["workers"].values()]
Expand All @@ -281,7 +278,7 @@ def test_nprocs_expands_name(loop):
"listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"]
)
def test_contact_listen_address(loop, nanny, listen_address):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(
[
"dask-worker",
Expand All @@ -293,7 +290,7 @@ def test_contact_listen_address(loop, nanny, listen_address):
"--listen-address",
listen_address,
]
) as worker:
):
with Client("127.0.0.1:8786") as client:
while not client.nthreads():
sleep(0.1)
Expand All @@ -313,14 +310,14 @@ def func(dask_worker):
@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
@pytest.mark.parametrize("host", ["127.0.0.2", "0.0.0.0"])
def test_respect_host_listen_address(loop, nanny, host):
with popen(["dask-scheduler", "--no-dashboard"]) as sched:
with popen(["dask-scheduler", "--no-dashboard"]):
with popen(
["dask-worker", "127.0.0.1:8786", nanny, "--no-dashboard", "--host", host]
) as worker:
with Client("127.0.0.1:8786") as client:
while not client.nthreads():
sleep(0.1)
info = client.scheduler_info()
client.scheduler_info()

# roundtrip works
assert client.submit(lambda x: x + 1, 10).result() == 11
Expand All @@ -341,7 +338,7 @@ def test_dashboard_non_standard_ports(loop):
except ImportError:
proxy_exists = False

with popen(["dask-scheduler", "--port", "3449"]) as s:
with popen(["dask-scheduler", "--port", "3449"]):
with popen(
[
"dask-worker",
Expand All @@ -351,7 +348,7 @@ def test_dashboard_non_standard_ports(loop):
"--host",
"127.0.0.1",
]
) as proc:
):
with Client("127.0.0.1:3449", loop=loop) as c:
c.wait_for_workers(1)
pass
Expand Down Expand Up @@ -406,14 +403,13 @@ def test_bokeh_deprecation():
pass


@pytest.mark.asyncio
async def test_integer_names(cleanup):
async with Scheduler(port=0) as s:
with popen(["dask-worker", s.address, "--name", "123"]) as worker:
while not s.workers:
await asyncio.sleep(0.01)
[ws] = s.workers.values()
assert ws.name == 123
@gen_cluster(nthreads=[])
async def test_integer_names(s):
with popen(["dask-worker", s.address, "--name", "123"]):
while not s.workers:
await asyncio.sleep(0.01)
[ws] = s.workers.values()
assert ws.name == 123


@pytest.mark.asyncio
Expand All @@ -438,7 +434,7 @@ class MyWorker(Worker):
else:
env["PYTHONPATH"] = tmpdir

async with Scheduler(port=0) as s:
async with Scheduler(port=0, dashboard_address=":0") as s:
async with Client(s.address, asynchronous=True) as c:
with popen(
[
Expand All @@ -449,7 +445,7 @@ class MyWorker(Worker):
"myworker.MyWorker",
],
env=env,
) as worker:
):
await c.wait_for_workers(1)

def worker_type(dask_worker):
Expand Down
6 changes: 3 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,10 +1467,10 @@ def close(self, timeout=no_default):
pc.stop()

if self.asynchronous:
future = self._close()
coro = self._close()
if timeout:
future = asyncio.wait_for(future, timeout)
return future
coro = asyncio.wait_for(coro, timeout)
return coro

if self._start_arg is None:
with suppress(AttributeError):
Expand Down
Loading