diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 6b310d8e544..a04e84d4370 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -26,20 +26,17 @@ def test_defaults(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as proc: + with popen(["dask-scheduler"]): async def f(): # Default behaviour is to listen on all addresses await assert_can_connect_from_everywhere_4_6(8786, timeout=5.0) - with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c: + with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop) as c: c.sync(f) response = requests.get("http://127.0.0.1:8787/status/") - assert response.status_code == 404 - - with pytest.raises(Exception): - response = requests.get("http://127.0.0.1:9786/info.json") + response.raise_for_status() def test_hostport(loop): @@ -55,9 +52,8 @@ async def f(): def test_no_dashboard(loop): - pytest.importorskip("bokeh") - with popen(["dask-scheduler", "--no-dashboard"]) as proc: - with Client("127.0.0.1:%d" % Scheduler.default_port, loop=loop) as c: + with popen(["dask-scheduler", "--no-dashboard"]): + with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop): response = requests.get("http://127.0.0.1:8787/status/") assert response.status_code == 404 diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index d90fccf954b..45f88c894b3 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -5,12 +5,12 @@ from distributed import Client from distributed.scheduler import COMPILED -from distributed.utils_test import popen +from distributed.utils_test import gen_test, popen @pytest.mark.skipif(COMPILED, reason="Fails with cythonized scheduler") -@pytest.mark.asyncio -async def test_text(cleanup): +@gen_test(timeout=120) +async def test_text(): with popen( [ sys.executable, @@ -19,7 +19,7 @@ async def test_text(cleanup): "--spec", '{"cls": "dask.distributed.Scheduler", "opts": {"port": 9373}}', ] - ) as sched: + ): with popen( [ sys.executable, @@ -29,7 +29,7 @@ async def test_text(cleanup): "--spec", '{"cls": "dask.distributed.Worker", "opts": {"nanny": false, "nthreads": 3, "name": "foo"}}', ] - ) as w: + ): async with Client("tcp://localhost:9373", asynchronous=True) as client: await client.wait_for_workers(1) info = await client.scheduler.identity() @@ -51,7 +51,7 @@ async def test_file(cleanup, tmp_path): f, ) - with popen(["dask-scheduler", "--port", "9373", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--port", "9373", "--no-dashboard"]): with popen( [ sys.executable, @@ -61,7 +61,7 @@ async def test_file(cleanup, tmp_path): "--spec-file", fn, ] - ) as w: + ): async with Client("tcp://localhost:9373", asynchronous=True) as client: await client.wait_for_workers(1) info = await client.scheduler.identity() diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 2d49b3b0977..d807791ea7d 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -17,11 +17,11 @@ from distributed.deploy.utils import nprocesses_nthreads from distributed.metrics import time from distributed.utils import parse_ports, sync, tmpfile -from distributed.utils_test import popen, terminate_process, wait_for_port +from distributed.utils_test import gen_cluster, popen, terminate_process, wait_for_port def test_nanny_worker_ports(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): with popen( [ "dask-worker", @@ -34,7 +34,7 @@ def test_nanny_worker_ports(loop): "5273", "--no-dashboard", ] - ) as worker: + ): with Client("127.0.0.1:9359", loop=loop) as c: start = time() while True: @@ -50,6 +50,7 @@ def test_nanny_worker_ports(loop): ) +@pytest.mark.slow def test_nanny_worker_port_range(loop): with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched: nprocs = 3 @@ -69,7 +70,7 @@ def test_nanny_worker_port_range(loop): nanny_port, "--no-dashboard", ] - ) as worker: + ): with Client("127.0.0.1:9359", loop=loop) as c: start = time() while len(c.scheduler_info()["workers"]) < nprocs: @@ -89,7 +90,7 @@ def get_port(dask_worker): def test_nanny_worker_port_range_too_many_workers_raises(loop): - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): with popen( [ "dask-worker", @@ -111,7 +112,7 @@ def test_nanny_worker_port_range_too_many_workers_raises(loop): def test_memory_limit(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen( [ "dask-worker", @@ -120,7 +121,7 @@ def test_memory_limit(loop): "2e3MB", "--no-dashboard", ] - ) as worker: + ): with Client("127.0.0.1:8786", loop=loop) as c: while not c.nthreads(): sleep(0.1) @@ -131,7 +132,7 @@ def test_memory_limit(loop): def test_no_nanny(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen( ["dask-worker", "127.0.0.1:8786", "--no-nanny", "--no-dashboard"] ) as worker: @@ -157,11 +158,11 @@ def test_no_reconnect(nanny, loop): start = time() while worker.poll() is None: sleep(0.1) - assert time() < start + 10 + assert time() < start + 30 def test_resources(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen( [ "dask-worker", @@ -170,7 +171,7 @@ def test_resources(loop): "--resources", "A=1 B=2,C=3", ] - ) as worker: + ): with Client("127.0.0.1:8786", loop=loop) as c: while not c.scheduler_info()["workers"]: sleep(0.1) @@ -182,7 +183,7 @@ def test_resources(loop): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_local_directory(loop, nanny): with tmpfile() as fn: - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen( [ "dask-worker", @@ -192,7 +193,7 @@ def test_local_directory(loop, nanny): "--local-directory", fn, ] - ) as worker: + ): with Client("127.0.0.1:8786", loop=loop, timeout=10) as c: start = time() while not c.scheduler_info()["workers"]: @@ -206,9 +207,7 @@ def test_local_directory(loop, nanny): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_scheduler_file(loop, nanny): with tmpfile() as fn: - with popen( - ["dask-scheduler", "--no-dashboard", "--scheduler-file", fn] - ) as sched: + with popen(["dask-scheduler", "--no-dashboard", "--scheduler-file", fn]): with popen( ["dask-worker", "--scheduler-file", fn, nanny, "--no-dashboard"] ): @@ -221,7 +220,7 @@ def test_scheduler_file(loop, nanny): def test_scheduler_address_env(loop, monkeypatch): monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", "tcp://127.0.0.1:8786") - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen(["dask-worker", "--no-dashboard"]): with Client(os.environ["DASK_SCHEDULER_ADDRESS"], loop=loop) as c: start = time() @@ -231,7 +230,7 @@ def test_scheduler_address_env(loop, monkeypatch): def test_nprocs_requires_nanny(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen( ["dask-worker", "127.0.0.1:8786", "--nprocs=2", "--no-nanny"] ) as worker: @@ -242,31 +241,29 @@ def test_nprocs_requires_nanny(loop): def test_nprocs_negative(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: - with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=-1"]) as worker: + with popen(["dask-scheduler", "--no-dashboard"]): + with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=-1"]): with Client("tcp://127.0.0.1:8786", loop=loop) as c: c.wait_for_workers(cpu_count(), timeout="10 seconds") def test_nprocs_auto(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: - with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=auto"]) as worker: + with popen(["dask-scheduler", "--no-dashboard"]): + with popen(["dask-worker", "127.0.0.1:8786", "--nprocs=auto"]): with Client("tcp://127.0.0.1:8786", loop=loop) as c: procs, _ = nprocesses_nthreads() c.wait_for_workers(procs, timeout="10 seconds") def test_nprocs_expands_name(loop): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: - with popen( - ["dask-worker", "127.0.0.1:8786", "--nprocs", "2", "--name", "0"] - ) as worker: - with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2"]) as worker: + with popen(["dask-scheduler", "--no-dashboard"]): + with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2", "--name", "0"]): + with popen(["dask-worker", "127.0.0.1:8786", "--nprocs", "2"]): with Client("tcp://127.0.0.1:8786", loop=loop) as c: start = time() while len(c.scheduler_info()["workers"]) < 4: sleep(0.2) - assert time() < start + 10 + assert time() < start + 30 info = c.scheduler_info() names = [d["name"] for d in info["workers"].values()] @@ -281,7 +278,7 @@ def test_nprocs_expands_name(loop): "listen_address", ["tcp://0.0.0.0:39837", "tcp://127.0.0.2:39837"] ) def test_contact_listen_address(loop, nanny, listen_address): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen( [ "dask-worker", @@ -293,7 +290,7 @@ def test_contact_listen_address(loop, nanny, listen_address): "--listen-address", listen_address, ] - ) as worker: + ): with Client("127.0.0.1:8786") as client: while not client.nthreads(): sleep(0.1) @@ -313,14 +310,14 @@ def func(dask_worker): @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) @pytest.mark.parametrize("host", ["127.0.0.2", "0.0.0.0"]) def test_respect_host_listen_address(loop, nanny, host): - with popen(["dask-scheduler", "--no-dashboard"]) as sched: + with popen(["dask-scheduler", "--no-dashboard"]): with popen( ["dask-worker", "127.0.0.1:8786", nanny, "--no-dashboard", "--host", host] ) as worker: with Client("127.0.0.1:8786") as client: while not client.nthreads(): sleep(0.1) - info = client.scheduler_info() + client.scheduler_info() # roundtrip works assert client.submit(lambda x: x + 1, 10).result() == 11 @@ -341,7 +338,7 @@ def test_dashboard_non_standard_ports(loop): except ImportError: proxy_exists = False - with popen(["dask-scheduler", "--port", "3449"]) as s: + with popen(["dask-scheduler", "--port", "3449"]): with popen( [ "dask-worker", @@ -351,7 +348,7 @@ def test_dashboard_non_standard_ports(loop): "--host", "127.0.0.1", ] - ) as proc: + ): with Client("127.0.0.1:3449", loop=loop) as c: c.wait_for_workers(1) pass @@ -406,14 +403,13 @@ def test_bokeh_deprecation(): pass -@pytest.mark.asyncio -async def test_integer_names(cleanup): - async with Scheduler(port=0) as s: - with popen(["dask-worker", s.address, "--name", "123"]) as worker: - while not s.workers: - await asyncio.sleep(0.01) - [ws] = s.workers.values() - assert ws.name == 123 +@gen_cluster(nthreads=[]) +async def test_integer_names(s): + with popen(["dask-worker", s.address, "--name", "123"]): + while not s.workers: + await asyncio.sleep(0.01) + [ws] = s.workers.values() + assert ws.name == 123 @pytest.mark.asyncio @@ -438,7 +434,7 @@ class MyWorker(Worker): else: env["PYTHONPATH"] = tmpdir - async with Scheduler(port=0) as s: + async with Scheduler(dashboard_address=":0") as s: async with Client(s.address, asynchronous=True) as c: with popen( [ @@ -449,7 +445,7 @@ class MyWorker(Worker): "myworker.MyWorker", ], env=env, - ) as worker: + ): await c.wait_for_workers(1) def worker_type(dask_worker): diff --git a/distributed/client.py b/distributed/client.py index 75937435f07..39153bd6228 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1474,10 +1474,10 @@ def close(self, timeout=no_default): pc.stop() if self.asynchronous: - future = self._close() + coro = self._close() if timeout: - future = asyncio.wait_for(future, timeout) - return future + coro = asyncio.wait_for(coro, timeout) + return coro if self._start_arg is None: with suppress(AttributeError): diff --git a/distributed/comm/tests/test_ucx.py b/distributed/comm/tests/test_ucx.py index 64948b68028..d463104dc79 100644 --- a/distributed/comm/tests/test_ucx.py +++ b/distributed/comm/tests/test_ucx.py @@ -4,12 +4,12 @@ ucp = pytest.importorskip("ucp") -from distributed import Client, Scheduler, Worker, wait +from distributed import Client, Scheduler, wait from distributed.comm import connect, listen, parse_address, ucx from distributed.comm.registry import backends, get_backend from distributed.deploy.local import LocalCluster from distributed.protocol import to_serialize -from distributed.utils_test import gen_test, inc +from distributed.utils_test import gen_cluster, gen_test, inc try: HOST = ucp.get_address() @@ -300,31 +300,25 @@ async def test_stress(cleanup): await wait(x) -@pytest.mark.asyncio -async def test_simple(cleanup): - async with Scheduler(protocol="ucx") as s: - async with Worker(s.address) as a: - async with Client(s.address, asynchronous=True) as c: - result = await c.submit(lambda x: x + 1, 10) - assert result == 11 +@gen_cluster(client=True, scheduler_kwargs={"protocol": "ucx"}) +async def test_simple(c, s, a, b): + assert s.address.startswith("ucx://") + assert await c.submit(lambda x: x + 1, 10) == 11 -@pytest.mark.asyncio -async def test_transpose(cleanup): +@gen_cluster(client=True, scheduler_kwargs={"protocol": "ucx"}) +async def test_transpose(c, s, a, b): da = pytest.importorskip("dask.array") - async with Scheduler(protocol="ucx") as s: - async with Worker(s.address) as a, Worker(s.address) as b: - async with Client(s.address, asynchronous=True) as c: - x = da.ones((10000, 10000), chunks=(1000, 1000)).persist() - await x - - y = (x + x.T).sum() - await y + assert s.address.startswith("ucx://") + x = da.ones((10000, 10000), chunks=(1000, 1000)).persist() + await x + y = (x + x.T).sum() + await y @pytest.mark.asyncio @pytest.mark.parametrize("port", [0, 1234]) async def test_ucx_protocol(cleanup, port): - async with Scheduler(protocol="ucx", port=port) as s: + async with Scheduler(protocol="ucx", port=port, dashboard_address=":0") as s: assert s.address.startswith("ucx://") diff --git a/distributed/comm/tests/test_ws.py b/distributed/comm/tests/test_ws.py index dee4c8f6a8e..08d1cbce0ee 100644 --- a/distributed/comm/tests/test_ws.py +++ b/distributed/comm/tests/test_ws.py @@ -14,6 +14,7 @@ from distributed.security import Security from distributed.utils_test import ( gen_cluster, + gen_test, get_client_ssl_context, get_server_ssl_context, inc, @@ -30,8 +31,8 @@ def test_registered(): assert isinstance(backend, ws.WSBackend) -@pytest.mark.asyncio -async def test_listen_connect(cleanup): +@gen_test() +async def test_listen_connect(): async def handle_comm(comm): while True: msg = await comm.read() @@ -46,8 +47,8 @@ async def handle_comm(comm): await comm.close() -@pytest.mark.asyncio -async def test_listen_connect_wss(cleanup): +@gen_test() +async def test_listen_connect_wss(): async def handle_comm(comm): while True: msg = await comm.read() @@ -66,8 +67,8 @@ async def handle_comm(comm): await comm.close() -@pytest.mark.asyncio -async def test_expect_ssl_context(cleanup): +@gen_test() +async def test_expect_ssl_context(): server_ctx = get_server_ssl_context() async with listen("wss://", lambda comm: comm, ssl_context=server_ctx) as listener: @@ -75,8 +76,8 @@ async def test_expect_ssl_context(cleanup): comm = await connect(listener.contact_address) -@pytest.mark.asyncio -async def test_expect_scheduler_ssl_when_sharing_server(cleanup): +@gen_test() +async def test_expect_scheduler_ssl_when_sharing_server(): with tempfile.TemporaryDirectory() as tempdir: key_path = os.path.join(tempdir, "dask.pem") cert_path = os.path.join(tempdir, "dask.crt") @@ -90,41 +91,30 @@ async def test_expect_scheduler_ssl_when_sharing_server(cleanup): } with dask.config.set(c): with pytest.raises(RuntimeError): - async with Scheduler(protocol="ws://", dashboard=True, port=8787) as s: + async with Scheduler(protocol="ws://", dashboard=True, port=8787): pass -@pytest.mark.asyncio -async def test_roundtrip(cleanup): - async with Scheduler(protocol="ws://") as s: - async with Worker(s.address) as w: - async with Client(s.address, asynchronous=True) as c: - assert c.scheduler.address.startswith("ws://") - assert w.address.startswith("ws://") - future = c.submit(inc, 1) - result = await future - assert result == 2 +@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"}) +async def test_roundtrip(c, s, a, b): + assert a.address.startswith("ws://") + assert b.address.startswith("ws://") + assert c.scheduler.address.startswith("ws://") + assert await c.submit(inc, 1) == 2 -@pytest.mark.asyncio -async def test_collections(cleanup): +@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"}) +async def test_collections(c, s, a, b): da = pytest.importorskip("dask.array") - async with Scheduler(protocol="ws://") as s: - async with Worker(s.address) as a: - async with Worker(s.address) as b: - async with Client(s.address, asynchronous=True) as c: - x = da.random.random((1000, 1000), chunks=(100, 100)) - x = x + x.T - await x.persist() + x = da.random.random((1000, 1000), chunks=(100, 100)) + x = x + x.T + await x.persist() -@pytest.mark.asyncio -async def test_large_transfer(cleanup): +@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"}) +async def test_large_transfer(c, s, a, b): np = pytest.importorskip("numpy") - async with Scheduler(protocol="ws://") as s: - async with Worker(s.address, protocol="ws://"): - async with Client(s.address, asynchronous=True) as c: - await c.scatter(np.random.random(1_000_000)) + await c.scatter(np.random.random(1_000_000)) @pytest.mark.asyncio @@ -182,17 +172,21 @@ async def test_http_and_comm_server(cleanup, dashboard, protocol, security, port ], ) async def test_connection_made_with_extra_conn_args(cleanup, protocol, security): - async with Scheduler(protocol=protocol, security=security) as s: + async with Scheduler( + protocol=protocol, security=security, dashboard_address=":0" + ) as s: connection_args = security.get_connection_args("worker") comm = await connect(s.address, **connection_args) assert comm.sock.request.headers.get("Authorization") == "Token abcd" await comm.close() -@pytest.mark.asyncio -async def test_quiet_close(cleanup): +@gen_test() +async def test_quiet_close(): with warnings.catch_warnings(record=True) as record: - async with Client(protocol="ws", processes=False, asynchronous=True) as c: + async with Client( + protocol="ws", processes=False, asynchronous=True, dashboard_address=":0" + ): pass # For some reason unrelated @coroutine warnings are showing up @@ -201,10 +195,7 @@ async def test_quiet_close(cleanup): assert not record, record[0].message -@gen_cluster( - client=True, - scheduler_kwargs={"protocol": "ws://"}, -) +@gen_cluster(client=True, scheduler_kwargs={"protocol": "ws://"}) async def test_ws_roundtrip(c, s, a, b): x = np.arange(100) future = await c.scatter(x) @@ -212,11 +203,7 @@ async def test_ws_roundtrip(c, s, a, b): assert (x == y).all() -@gen_cluster( - client=True, - security=security, - scheduler_kwargs={"protocol": "wss://"}, -) +@gen_cluster(client=True, security=security, scheduler_kwargs={"protocol": "wss://"}) async def test_wss_roundtrip(c, s, a, b): x = np.arange(100) future = await c.scatter(x) diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index f003c46d2a8..473595e0d73 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -58,7 +58,7 @@ async def test_simple(c, s, a, b): http_client = AsyncHTTPClient() for suffix in applications: - response = await http_client.fetch("http://localhost:%d%s" % (port, suffix)) + response = await http_client.fetch(f"http://localhost:{port}{suffix}") body = response.body.decode() assert "bokeh" in body.lower() assert not re.search("href=./", body) # no absolute links diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 4ad27d85df1..d463796dc5e 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -7,7 +7,15 @@ import dask -from distributed import Adaptive, Client, LocalCluster, SpecCluster, Worker, wait +from distributed import ( + Adaptive, + Client, + LocalCluster, + Scheduler, + SpecCluster, + Worker, + wait, +) from distributed.compatibility import WINDOWS from distributed.metrics import time from distributed.utils_test import async_wait_for, clean, gen_test, slowinc @@ -16,9 +24,8 @@ def test_adaptive_local_cluster(loop): with LocalCluster( n_workers=0, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", loop=loop, ) as cluster: alc = cluster.adapt(interval="100 ms") @@ -45,10 +52,9 @@ def test_adaptive_local_cluster(loop): async def test_adaptive_local_cluster_multi_workers(): async with LocalCluster( n_workers=0, - scheduler_port=0, silence_logs=False, processes=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as cluster: @@ -76,8 +82,8 @@ async def test_adaptive_local_cluster_multi_workers(): @pytest.mark.xfail(reason="changed API") -@pytest.mark.asyncio -async def test_adaptive_scale_down_override(cleanup): +@gen_test() +async def test_adaptive_scale_down_override(): class TestAdaptive(Adaptive): def __init__(self, *args, **kwargs): self.min_size = kwargs.pop("min_size", 0) @@ -95,7 +101,9 @@ class TestCluster(LocalCluster): def scale_up(self, n, **kwargs): assert False - async with TestCluster(n_workers=10, processes=False, asynchronous=True) as cluster: + async with TestCluster( + n_workers=10, processes=False, asynchronous=True, dashboard_address=":0" + ) as cluster: ta = cluster.adapt( min_size=2, interval=0.1, scale_factor=2, Adaptive=TestAdaptive ) @@ -110,10 +118,9 @@ def scale_up(self, n, **kwargs): async def test_min_max(): cluster = await LocalCluster( n_workers=0, - scheduler_port=0, silence_logs=False, processes=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, threads_per_worker=1, ) @@ -167,9 +174,8 @@ async def test_avoid_churn(cleanup): n_workers=0, asynchronous=True, processes=False, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", ) as cluster: async with Client(cluster, asynchronous=True) as client: adapt = cluster.adapt(interval="20 ms", wait_count=5) @@ -192,9 +198,8 @@ async def test_adapt_quickly(): n_workers=0, asynchronous=True, processes=False, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", ) client = await Client(cluster, asynchronous=True) adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10) @@ -242,12 +247,11 @@ async def test_adapt_quickly(): async def test_adapt_down(): """Ensure that redefining adapt with a lower maximum removes workers""" async with LocalCluster( - 0, + n_workers=0, asynchronous=True, processes=False, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", ) as cluster: async with Client(cluster, asynchronous=True) as client: cluster.adapt(interval="20ms", maximum=5) @@ -270,11 +274,10 @@ async def test_no_more_workers_than_tasks(): {"distributed.scheduler.default-task-durations": {"slowinc": 1000}} ): async with LocalCluster( - 0, - scheduler_port=0, + n_workers=0, silence_logs=False, processes=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as cluster: adapt = cluster.adapt(minimum=0, maximum=4, interval="10 ms") @@ -287,7 +290,7 @@ def test_basic_no_loop(loop): with clean(threads=False): try: with LocalCluster( - 0, scheduler_port=0, silence_logs=False, dashboard_address=None + n_workers=0, silence_logs=False, dashboard_address=":0" ) as cluster: with Client(cluster) as client: cluster.adapt() @@ -309,9 +312,8 @@ async def test_target_duration(): n_workers=0, asynchronous=True, processes=False, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", ) as cluster: adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s") async with Client(cluster, asynchronous=True) as client: @@ -327,6 +329,7 @@ async def test_target_duration(): async def test_worker_keys(cleanup): """Ensure that redefining adapt with a lower maximum removes workers""" async with SpecCluster( + scheduler={"cls": Scheduler, "options": {"dashboard_address": ":0"}}, workers={ "a-1": {"cls": Worker}, "a-2": {"cls": Worker}, @@ -354,13 +357,12 @@ def key(ws): @pytest.mark.asyncio async def test_adapt_cores_memory(cleanup): async with LocalCluster( - 0, + n_workers=0, threads_per_worker=2, memory_limit="3 GB", - scheduler_port=0, silence_logs=False, processes=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as cluster: adapt = cluster.adapt(minimum_cores=3, maximum_cores=9) @@ -395,13 +397,12 @@ def test_adaptive_config(): @pytest.mark.asyncio async def test_update_adaptive(cleanup): async with LocalCluster( - 0, + n_workers=0, threads_per_worker=2, memory_limit="3 GB", - scheduler_port=0, silence_logs=False, processes=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as cluster: first = cluster.adapt(maxmimum=1) @@ -415,7 +416,11 @@ async def test_update_adaptive(cleanup): async def test_adaptive_no_memory_limit(cleanup): """Make sure that adapt() does not keep creating workers when no memory limit is set.""" async with LocalCluster( - n_workers=0, threads_per_worker=1, memory_limit=0, asynchronous=True + n_workers=0, + threads_per_worker=1, + memory_limit=0, + asynchronous=True, + dashboard_address=":0", ) as cluster: cluster.adapt(minimum=1, maximum=10, interval="1 ms") async with Client(cluster, asynchronous=True) as client: @@ -447,7 +452,9 @@ async def _(): return self.sync(_) - async with RequiresAwaitCluster(n_workers=0, asynchronous=True) as cluster: + async with RequiresAwaitCluster( + n_workers=0, asynchronous=True, dashboard_address=":0" + ) as cluster: async with Client(cluster, asynchronous=True) as client: futures = client.map(slowinc, range(5), delay=0.05) assert len(cluster.workers) == 0 @@ -465,7 +472,9 @@ async def test_adaptive_stopped(): We should ensure that the adapt PC is actually stopped once the cluster stops. """ - async with LocalCluster(n_workers=0, asynchronous=True) as cluster: + async with LocalCluster( + n_workers=0, asynchronous=True, dashboard_address=":0" + ) as cluster: instance = cluster.adapt(interval="10ms") assert instance.periodic_callback is not None diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index bddd6b6af0b..80bb39e45ca 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -5,7 +5,6 @@ import unittest import weakref from distutils.version import LooseVersion -from functools import partial from threading import Lock from time import sleep @@ -41,11 +40,10 @@ def test_simple(loop): with LocalCluster( - 4, - scheduler_port=0, + n_workers=4, processes=False, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", loop=loop, ) as c: with Client(c) as e: @@ -58,7 +56,12 @@ def test_simple(loop): def test_local_cluster_supports_blocked_handlers(loop): - with LocalCluster(blocked_handlers=["run_function"], n_workers=0, loop=loop) as c: + with LocalCluster( + blocked_handlers=["run_function"], + n_workers=0, + loop=loop, + dashboard_address=":0", + ) as c: with Client(c) as client: with pytest.raises(ValueError) as exc: client.run_on_scheduler(lambda x: x, 42) @@ -69,7 +72,7 @@ def test_local_cluster_supports_blocked_handlers(loop): def test_close_twice(): - with LocalCluster() as cluster: + with LocalCluster(dashboard_address=":0") as cluster: with Client(cluster.scheduler_address) as client: f = client.map(inc, range(100)) client.gather(f) @@ -84,10 +87,9 @@ def test_close_twice(): def test_procs(): with LocalCluster( n_workers=2, - scheduler_port=0, processes=False, threads_per_worker=3, - dashboard_address=None, + dashboard_address=":0", silence_logs=False, ) as c: assert len(c.workers) == 2 @@ -99,10 +101,9 @@ def test_procs(): with LocalCluster( n_workers=2, - scheduler_port=0, processes=True, threads_per_worker=3, - dashboard_address=None, + dashboard_address=":0", silence_logs=False, ) as c: assert len(c.workers) == 2 @@ -121,7 +122,7 @@ def test_move_unserializable_data(): transports. """ with LocalCluster( - processes=False, silence_logs=False, dashboard_address=None + processes=False, silence_logs=False, dashboard_address=":0" ) as cluster: assert cluster.scheduler_address.startswith("inproc://") assert cluster.workers[0].address.startswith("inproc://") @@ -137,7 +138,7 @@ def test_transports_inproc(): Test the transport chosen by LocalCluster depending on arguments. """ with LocalCluster( - 1, processes=False, silence_logs=False, dashboard_address=None + n_workers=1, processes=False, silence_logs=False, dashboard_address=":0" ) as c: assert c.scheduler_address.startswith("inproc://") assert c.workers[0].address.startswith("inproc://") @@ -148,7 +149,7 @@ def test_transports_inproc(): def test_transports_tcp(): # Have nannies => need TCP with LocalCluster( - 1, processes=True, silence_logs=False, dashboard_address=None + n_workers=1, processes=True, silence_logs=False, dashboard_address=":0" ) as c: assert c.scheduler_address.startswith("tcp://") assert c.workers[0].address.startswith("tcp://") @@ -159,11 +160,11 @@ def test_transports_tcp(): def test_transports_tcp_port(): # Scheduler port specified => need TCP with LocalCluster( - 1, + n_workers=1, processes=False, scheduler_port=8786, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", ) as c: assert c.scheduler_address == "tcp://127.0.0.1:8786" @@ -173,13 +174,13 @@ def test_transports_tcp_port(): class LocalTest(ClusterTest, unittest.TestCase): - Cluster = partial(LocalCluster, silence_logs=False, dashboard_address=None) - kwargs = {"dashboard_address": None, "processes": False} + Cluster = LocalCluster + kwargs = {"silence_logs": False, "dashboard_address": ":0", "processes": False} def test_Client_with_local(loop): with LocalCluster( - 1, scheduler_port=0, silence_logs=False, dashboard_address=None, loop=loop + n_workers=1, silence_logs=False, dashboard_address=":0", loop=loop ) as c: with Client(c) as e: assert len(e.nthreads()) == len(c.workers) @@ -187,7 +188,7 @@ def test_Client_with_local(loop): def test_Client_solo(loop): - with Client(loop=loop, silence_logs=False) as c: + with Client(loop=loop, silence_logs=False, dashboard_address=":0") as c: pass assert c.cluster.status == Status.closed @@ -221,14 +222,20 @@ async def test_duplicate_clients(): def test_Client_kwargs(loop): - with Client(loop=loop, processes=False, n_workers=2, silence_logs=False) as c: + with Client( + loop=loop, + processes=False, + n_workers=2, + silence_logs=False, + dashboard_address=":0", + ) as c: assert len(c.cluster.workers) == 2 assert all(isinstance(w, Worker) for w in c.cluster.workers.values()) assert c.cluster.status == Status.closed def test_Client_unused_kwargs_with_cluster(loop): - with LocalCluster() as cluster: + with LocalCluster(dashboard_address=":0") as cluster: with pytest.raises(Exception) as argexcept: c = Client(cluster, n_workers=2, dashboard_port=8000, silence_logs=None) assert ( @@ -249,37 +256,36 @@ def test_Client_unused_kwargs_with_address(loop): def test_Client_twice(loop): - with Client(loop=loop, silence_logs=False, dashboard_address=None) as c: - with Client(loop=loop, silence_logs=False, dashboard_address=None) as f: + with Client(loop=loop, silence_logs=False, dashboard_address=":0") as c: + with Client(loop=loop, silence_logs=False, dashboard_address=":0") as f: assert c.cluster.scheduler.port != f.cluster.scheduler.port -@pytest.mark.asyncio -async def test_client_constructor_with_temporary_security(cleanup): +@gen_test() +async def test_client_constructor_with_temporary_security(): pytest.importorskip("cryptography") async with Client( - security=True, silence_logs=False, dashboard_address=None, asynchronous=True + security=True, silence_logs=False, dashboard_address=":0", asynchronous=True ) as c: assert c.cluster.scheduler_address.startswith("tls") assert c.security == c.cluster.security -@pytest.mark.asyncio -async def test_defaults(cleanup): +@gen_test() +async def test_defaults(): async with LocalCluster( - scheduler_port=0, silence_logs=False, dashboard_address=None, asynchronous=True + silence_logs=False, dashboard_address=":0", asynchronous=True ) as c: assert sum(w.nthreads for w in c.workers.values()) == CPU_COUNT assert all(isinstance(w, Nanny) for w in c.workers.values()) -@pytest.mark.asyncio -async def test_defaults_2(cleanup): +@gen_test() +async def test_defaults_2(): async with LocalCluster( processes=False, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as c: assert sum(w.nthreads for w in c.workers.values()) == CPU_COUNT @@ -287,13 +293,12 @@ async def test_defaults_2(cleanup): assert len(c.workers) == 1 -@pytest.mark.asyncio -async def test_defaults_3(cleanup): +@gen_test() +async def test_defaults_3(): async with LocalCluster( n_workers=2, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as c: if CPU_COUNT % 2 == 0: @@ -304,66 +309,61 @@ async def test_defaults_3(cleanup): assert sum(w.nthreads for w in c.workers.values()) == expected_total_threads -@pytest.mark.asyncio -async def test_defaults_4(cleanup): +@gen_test() +async def test_defaults_4(): async with LocalCluster( threads_per_worker=CPU_COUNT * 2, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as c: assert len(c.workers) == 1 -@pytest.mark.asyncio -async def test_defaults_5(cleanup): +@gen_test() +async def test_defaults_5(): async with LocalCluster( n_workers=CPU_COUNT * 2, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as c: assert all(w.nthreads == 1 for w in c.workers.values()) -@pytest.mark.asyncio -async def test_defaults_6(cleanup): +@gen_test() +async def test_defaults_6(): async with LocalCluster( threads_per_worker=2, n_workers=3, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as c: assert len(c.workers) == 3 assert all(w.nthreads == 2 for w in c.workers.values()) -@pytest.mark.asyncio -async def test_worker_params(cleanup): +@gen_test() +async def test_worker_params(): async with LocalCluster( processes=False, n_workers=2, - scheduler_port=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", memory_limit=500, asynchronous=True, ) as c: assert [w.memory_limit for w in c.workers.values()] == [500] * 2 -@pytest.mark.asyncio -async def test_memory_limit_none(cleanup): +@gen_test() +async def test_memory_limit_none(): async with LocalCluster( n_workers=2, - scheduler_port=0, silence_logs=False, processes=False, - dashboard_address=None, + dashboard_address=":0", memory_limit=None, asynchronous=True, ) as c: @@ -374,13 +374,11 @@ async def test_memory_limit_none(cleanup): def test_cleanup(): with clean(threads=False): - c = LocalCluster( - 2, scheduler_port=0, silence_logs=False, dashboard_address=None - ) + c = LocalCluster(n_workers=2, silence_logs=False, dashboard_address=":0") port = c.scheduler.port c.close() c2 = LocalCluster( - 2, scheduler_port=port, silence_logs=False, dashboard_address=None + n_workers=2, scheduler_port=port, silence_logs=False, dashboard_address=":0" ) c2.close() @@ -388,12 +386,12 @@ def test_cleanup(): def test_repeated(): with clean(threads=False): with LocalCluster( - 0, scheduler_port=8448, silence_logs=False, dashboard_address=None - ) as c: + n_workers=0, scheduler_port=8448, silence_logs=False, dashboard_address=":0" + ): pass with LocalCluster( - 0, scheduler_port=8448, silence_logs=False, dashboard_address=None - ) as c: + n_workers=0, scheduler_port=8448, silence_logs=False, dashboard_address=":0" + ): pass @@ -403,11 +401,10 @@ def test_bokeh(loop, processes): requests = pytest.importorskip("requests") with LocalCluster( n_workers=0, - scheduler_port=0, silence_logs=False, loop=loop, processes=processes, - dashboard_address=0, + dashboard_address=":0", ) as c: bokeh_port = c.scheduler.http_server.port url = "http://127.0.0.1:%d/status/" % bokeh_port @@ -427,18 +424,17 @@ def test_bokeh(loop, processes): def test_blocks_until_full(loop): - with Client(loop=loop) as c: + with Client(loop=loop, dashboard_address=":0") as c: assert len(c.nthreads()) > 0 -@pytest.mark.asyncio +@gen_test() async def test_scale_up_and_down(): async with LocalCluster( n_workers=0, - scheduler_port=0, processes=False, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as cluster: async with Client(cluster, asynchronous=True) as c: @@ -468,7 +464,7 @@ def test_silent_startup(): from distributed import LocalCluster if __name__ == "__main__": - with LocalCluster(1, dashboard_address=None, scheduler_port=0): + with LocalCluster(n_workers=1, dashboard_address=":0"): sleep(.1) """ @@ -486,17 +482,16 @@ def test_silent_startup(): def test_only_local_access(loop): with LocalCluster( - 0, scheduler_port=0, silence_logs=False, dashboard_address=None, loop=loop + n_workers=0, silence_logs=False, dashboard_address=":0", loop=loop ) as c: sync(loop, assert_can_connect_locally_4, c.scheduler.port) def test_remote_access(loop): with LocalCluster( - 0, - scheduler_port=0, + n_workers=0, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", host="", loop=loop, ) as c: @@ -507,10 +502,9 @@ def test_remote_access(loop): def test_memory(loop, n_workers): with LocalCluster( n_workers=n_workers, - scheduler_port=0, processes=False, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", loop=loop, ) as cluster: assert sum(w.memory_limit for w in cluster.workers.values()) <= MEMORY_LIMIT @@ -520,10 +514,9 @@ def test_memory(loop, n_workers): def test_memory_nanny(loop, n_workers): with LocalCluster( n_workers=n_workers, - scheduler_port=0, processes=True, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", loop=loop, ) as cluster: with Client(cluster.scheduler_address, loop=loop) as c: @@ -536,24 +529,22 @@ def test_memory_nanny(loop, n_workers): def test_death_timeout_raises(loop): with pytest.raises(TimeoutError): with LocalCluster( - scheduler_port=0, silence_logs=False, death_timeout=1e-10, - dashboard_address=None, + dashboard_address=":0", loop=loop, ) as cluster: pass LocalCluster._instances.clear() # ignore test hygiene checks -@pytest.mark.asyncio -async def test_bokeh_kwargs(cleanup): +@gen_test() +async def test_bokeh_kwargs(): pytest.importorskip("bokeh") async with LocalCluster( n_workers=0, - scheduler_port=0, silence_logs=False, - dashboard_address=0, + dashboard_address=":0", asynchronous=True, scheduler_kwargs={"http_prefix": "/foo"}, ) as c: @@ -565,9 +556,7 @@ async def test_bokeh_kwargs(cleanup): def test_io_loop_periodic_callbacks(loop): - with LocalCluster( - loop=loop, port=0, dashboard_address=None, silence_logs=False - ) as cluster: + with LocalCluster(loop=loop, dashboard_address=":0", silence_logs=False) as cluster: assert cluster.scheduler.loop is loop for pc in cluster.scheduler.periodic_callbacks.values(): assert pc.io_loop is loop @@ -580,7 +569,7 @@ def test_logging(): """ Workers and scheduler have logs even when silenced """ - with LocalCluster(1, processes=False, dashboard_address=None) as c: + with LocalCluster(n_workers=1, processes=False, dashboard_address=":0") as c: assert c.scheduler._deque_handler.deque assert c.workers[0]._deque_handler.deque @@ -589,10 +578,9 @@ def test_ipywidgets(loop): ipywidgets = pytest.importorskip("ipywidgets") with LocalCluster( n_workers=0, - scheduler_port=0, silence_logs=False, loop=loop, - dashboard_address=False, + dashboard_address=":0", processes=False, ) as cluster: cluster._ipython_display_() @@ -610,10 +598,9 @@ def test_no_ipywidgets(loop, monkeypatch): with LocalCluster( n_workers=0, - scheduler_port=0, silence_logs=False, loop=loop, - dashboard_address=False, + dashboard_address=":0", processes=False, ) as cluster: cluster._ipython_display_() @@ -628,10 +615,9 @@ def test_no_ipywidgets(loop, monkeypatch): def test_scale(loop): """Directly calling scale both up and down works as expected""" with LocalCluster( - scheduler_port=0, silence_logs=False, loop=loop, - dashboard_address=False, + dashboard_address=":0", processes=False, n_workers=0, ) as cluster: @@ -655,10 +641,9 @@ def test_scale(loop): def test_adapt(loop): with LocalCluster( - scheduler_port=0, silence_logs=False, loop=loop, - dashboard_address=False, + dashboard_address=":0", processes=False, n_workers=0, ) as cluster: @@ -687,10 +672,9 @@ def test_adapt(loop): def test_adapt_then_manual(loop): """We can revert from adaptive, back to manual""" with LocalCluster( - scheduler_port=0, silence_logs=False, loop=loop, - dashboard_address=False, + dashboard_address=":0", processes=False, n_workers=8, ) as cluster: @@ -731,7 +715,7 @@ def test_local_tls(loop, temporary): scheduler_port=8786, silence_logs=False, security=security, - dashboard_address=False, + dashboard_address=":0", host="tls://0.0.0.0", loop=loop, ) as c: @@ -763,10 +747,9 @@ def scale_down(self, *args, **kwargs): loop = IOLoop.current() cluster = await MyCluster( n_workers=0, - scheduler_port=0, processes=False, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", loop=loop, asynchronous=True, ) @@ -802,7 +785,7 @@ def test_local_tls_restart(loop): scheduler_port=8786, silence_logs=False, security=security, - dashboard_address=False, + dashboard_address=":0", host="tls://0.0.0.0", loop=loop, ) as c: @@ -817,11 +800,10 @@ def test_local_tls_restart(loop): def test_asynchronous_property(loop): with LocalCluster( - 4, - scheduler_port=0, + n_workers=4, processes=False, silence_logs=False, - dashboard_address=None, + dashboard_address=":0", loop=loop, ) as cluster: @@ -832,13 +814,15 @@ async def _(): def test_protocol_inproc(loop): - with LocalCluster(protocol="inproc://", loop=loop, processes=False) as cluster: + with LocalCluster( + protocol="inproc://", loop=loop, processes=False, dashboard_address=":0" + ) as cluster: assert cluster.scheduler.address.startswith("inproc://") def test_protocol_tcp(loop): with LocalCluster( - protocol="tcp", loop=loop, n_workers=0, processes=False + protocol="tcp", loop=loop, n_workers=0, processes=False, dashboard_address=":0" ) as cluster: assert cluster.scheduler.address.startswith("tcp://") @@ -846,7 +830,11 @@ def test_protocol_tcp(loop): @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") def test_protocol_ip(loop): with LocalCluster( - host="tcp://127.0.0.2", loop=loop, n_workers=0, processes=False + host="tcp://127.0.0.2", + loop=loop, + n_workers=0, + processes=False, + dashboard_address=":0", ) as cluster: assert cluster.scheduler.address.startswith("tcp://127.0.0.2") @@ -861,8 +849,7 @@ def test_worker_class_worker(loop): loop=loop, worker_class=MyWorker, processes=False, - scheduler_port=0, - dashboard_address=None, + dashboard_address=":0", ) as cluster: assert all(isinstance(w, MyWorker) for w in cluster.workers.values()) @@ -875,22 +862,20 @@ class MyNanny(Nanny): n_workers=2, loop=loop, worker_class=MyNanny, - scheduler_port=0, - dashboard_address=None, + dashboard_address=":0", ) as cluster: assert all(isinstance(w, MyNanny) for w in cluster.workers.values()) -@pytest.mark.asyncio -async def test_worker_class_nanny_async(cleanup): +@gen_test() +async def test_worker_class_nanny_async(): class MyNanny(Nanny): pass async with LocalCluster( n_workers=2, worker_class=MyNanny, - scheduler_port=0, - dashboard_address=None, + dashboard_address=":0", asynchronous=True, ) as cluster: assert all(isinstance(w, MyNanny) for w in cluster.workers.values()) @@ -901,8 +886,7 @@ def test_starts_up_sync(loop): n_workers=2, loop=loop, processes=False, - scheduler_port=0, - dashboard_address=None, + dashboard_address=":0", ) try: assert len(cluster.scheduler.workers) == 2 @@ -914,7 +898,7 @@ def test_dont_select_closed_worker(): # Make sure distributed does not try to reuse a client from a # closed cluster (https://github.com/dask/distributed/issues/2840). with clean(threads=False): - cluster = LocalCluster(n_workers=0) + cluster = LocalCluster(n_workers=0, dashboard_address=":0") c = Client(cluster) cluster.scale(2) assert c == get_client() @@ -922,7 +906,7 @@ def test_dont_select_closed_worker(): c.close() cluster.close() - cluster2 = LocalCluster(n_workers=0) + cluster2 = LocalCluster(n_workers=0, dashboard_address=":0") c2 = Client(cluster2) cluster2.scale(2) @@ -935,19 +919,20 @@ def test_dont_select_closed_worker(): def test_client_cluster_synchronous(loop): with clean(threads=False): - with Client(loop=loop, processes=False) as c: + with Client(loop=loop, processes=False, dashboard_address=":0") as c: assert not c.asynchronous assert not c.cluster.asynchronous -@pytest.mark.asyncio -async def test_scale_memory_cores(cleanup): +@gen_test() +async def test_scale_memory_cores(): async with LocalCluster( n_workers=0, processes=False, threads_per_worker=2, memory_limit="2GB", asynchronous=True, + dashboard_address=":0", ) as cluster: cluster.scale(cores=4) assert len(cluster.worker_spec) == 2 @@ -963,30 +948,32 @@ async def test_scale_memory_cores(cleanup): @pytest.mark.asyncio -async def test_repr(cleanup): +@pytest.mark.parametrize("memory_limit", ["2 GiB", None]) +async def test_repr(memory_limit, cleanup): async with LocalCluster( n_workers=2, processes=False, threads_per_worker=2, - memory_limit="2GB", + memory_limit=memory_limit, asynchronous=True, + dashboard_address=":0", ) as cluster: - async with Client(cluster, asynchronous=True) as client: - await client.wait_for_workers(2) + # __repr__ uses cluster.scheduler_info, which slightly lags behind + # cluster.scheduler.workers and client.wait_for_workers. + while len(cluster.scheduler_info["workers"]) < 2: + await asyncio.sleep(0.01) + text = repr(cluster) - assert "workers=2" in text assert cluster.scheduler_address in text - assert "cores=4" in text or "threads=4" in text - assert "4.00 GB" in text or "3.73 GiB" in text - - async with LocalCluster( - n_workers=2, processes=False, memory_limit=None, asynchronous=True - ) as cluster: - assert "memory" not in repr(cluster) + assert "workers=2, threads=4" in text + if memory_limit: + assert "memory=4.00 GiB" in text + else: + assert "memory" not in text -@pytest.mark.asyncio -async def test_threads_per_worker_set_to_0(cleanup): +@gen_test() +async def test_threads_per_worker_set_to_0(): with pytest.warns( Warning, match="Setting `threads_per_worker` to 0 has been deprecated." ): @@ -1010,45 +997,49 @@ async def test_capture_security(cleanup, temporary): silence_logs=False, security=security, asynchronous=True, - dashboard_address=False, + dashboard_address=":0", host="tls://0.0.0.0", ) as cluster: async with Client(cluster, asynchronous=True) as client: assert client.security == cluster.security -@pytest.mark.asyncio -async def test_no_danglng_asyncio_tasks(cleanup): +@gen_test() +async def test_no_dangling_asyncio_tasks(): start = asyncio.all_tasks() - async with LocalCluster(asynchronous=True, processes=False): + async with LocalCluster(asynchronous=True, processes=False, dashboard_address=":0"): await asyncio.sleep(0.01) tasks = asyncio.all_tasks() assert tasks == start -@pytest.mark.asyncio +@gen_test() async def test_async_with(): - async with LocalCluster(processes=False, asynchronous=True) as cluster: + async with LocalCluster( + processes=False, asynchronous=True, dashboard_address=":0" + ) as cluster: w = cluster.workers assert w assert not w -@pytest.mark.asyncio -async def test_no_workers(cleanup): +@gen_test() +async def test_no_workers(): async with Client( - n_workers=0, silence_logs=False, dashboard_address=None, asynchronous=True + n_workers=0, silence_logs=False, dashboard_address=":0", asynchronous=True ) as c: pass -@pytest.mark.asyncio +@gen_test() async def test_cluster_names(): - async with LocalCluster(processes=False, asynchronous=True) as unnamed_cluster: + async with LocalCluster( + processes=False, asynchronous=True, dashboard_address=":0" + ) as unnamed_cluster: async with LocalCluster( - processes=False, asynchronous=True, name="mycluster" + processes=False, asynchronous=True, name="mycluster", dashboard_address=":0" ) as named_cluster: assert isinstance(unnamed_cluster.name, str) assert isinstance(named_cluster.name, str) @@ -1057,7 +1048,9 @@ async def test_cluster_names(): assert named_cluster == named_cluster assert unnamed_cluster != named_cluster - async with LocalCluster(processes=False, asynchronous=True) as unnamed_cluster2: + async with LocalCluster( + processes=False, asynchronous=True, dashboard_address=":0" + ) as unnamed_cluster2: assert unnamed_cluster2 != unnamed_cluster @@ -1069,7 +1062,10 @@ async def test_local_cluster_redundant_kwarg(nanny): # whether we use the nanny or not, the error treatment is quite # different and we should assert that an exception is raised async with await LocalCluster( - typo_kwarg="foo", processes=nanny, n_workers=1 + typo_kwarg="foo", + processes=nanny, + n_workers=1, + dashboard_address=":0", ) as cluster: # This will never work but is a reliable way to block without hard diff --git a/distributed/deploy/tests/test_slow_adaptive.py b/distributed/deploy/tests/test_slow_adaptive.py index ac3721d9a2b..853a5c4b4dc 100644 --- a/distributed/deploy/tests/test_slow_adaptive.py +++ b/distributed/deploy/tests/test_slow_adaptive.py @@ -32,7 +32,7 @@ async def close(self): self.status = "closed" -scheduler = {"cls": Scheduler, "options": {"port": 0}} +scheduler = {"cls": Scheduler, "options": {"dashboard_address": ":0"}} @pytest.mark.asyncio diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index 6b8ea8ee0e6..c51748c2549 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -14,7 +14,7 @@ from distributed.deploy.spec import ProcessInterface, close_clusters, run_spec from distributed.metrics import time from distributed.utils import is_valid_xml -from distributed.utils_test import gen_test +from distributed.utils_test import gen_cluster, gen_test class MyWorker(Worker): @@ -34,11 +34,11 @@ async def _(): 1: {"cls": Worker, "options": {"nthreads": 2}}, "my-worker": {"cls": MyWorker, "options": {"nthreads": 3}}, } -scheduler = {"cls": Scheduler, "options": {"port": 0}} +scheduler = {"cls": Scheduler, "options": {"dashboard_address": ":0"}} -@pytest.mark.asyncio -async def test_specification(cleanup): +@gen_test() +async def test_specification(): async with SpecCluster( workers=worker_spec, scheduler=scheduler, asynchronous=True ) as cluster: @@ -89,14 +89,12 @@ def test_spec_sync(loop): def test_loop_started(): - with SpecCluster( - worker_spec, scheduler={"cls": Scheduler, "options": {"port": 0}} - ) as cluster: + with SpecCluster(worker_spec, scheduler=scheduler): pass -@pytest.mark.asyncio -async def test_repr(cleanup): +@gen_test() +async def test_repr(): worker = {"cls": Worker, "options": {"nthreads": 1}} class MyCluster(SpecCluster): @@ -108,8 +106,8 @@ class MyCluster(SpecCluster): assert "MyCluster" in str(cluster) -@pytest.mark.asyncio -async def test_scale(cleanup): +@gen_test() +async def test_scale(): worker = {"cls": Worker, "options": {"nthreads": 1}} async with SpecCluster( asynchronous=True, scheduler=scheduler, worker=worker @@ -146,7 +144,7 @@ async def test_adaptive_killed_worker(): async with SpecCluster( asynchronous=True, worker={"cls": Nanny, "options": {"nthreads": 1}}, - scheduler={"cls": Scheduler, "options": {"port": 0}}, + scheduler=scheduler, ) as cluster: async with Client(cluster, asynchronous=True) as client: # Scale up a cluster with 1 worker. @@ -212,32 +210,33 @@ async def test_restart(): @pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") +# FIXME cleanup fails: +# some RPCs left active by test: {} +# @gen_test() @pytest.mark.asyncio async def test_broken_worker(): with pytest.raises(Exception) as info: async with SpecCluster( asynchronous=True, workers={"good": {"cls": Worker}, "bad": {"cls": BrokenWorker}}, - scheduler={"cls": Scheduler, "options": {"port": 0}}, - ) as cluster: + scheduler=scheduler, + ): pass assert "Broken" in str(info.value) @pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out") -@pytest.mark.slow def test_spec_close_clusters(loop): workers = {0: {"cls": Worker}} - scheduler = {"cls": Scheduler, "options": {"port": 0}} cluster = SpecCluster(workers=workers, scheduler=scheduler, loop=loop) assert cluster in SpecCluster._instances close_clusters() assert cluster.status == Status.closed -@pytest.mark.asyncio -async def test_new_worker_spec(cleanup): +@gen_test() +async def test_new_worker_spec(): class MyCluster(SpecCluster): def new_worker_spec(self): i = len(self.worker_spec) @@ -249,18 +248,14 @@ def new_worker_spec(self): assert cluster.worker_spec[i]["options"]["nthreads"] == i + 1 -@pytest.mark.asyncio +@gen_test() async def test_nanny_port(): - scheduler = {"cls": Scheduler} workers = {0: {"cls": Nanny, "options": {"port": 9200}}} - - async with SpecCluster( - scheduler=scheduler, workers=workers, asynchronous=True - ) as cluster: + async with SpecCluster(scheduler=scheduler, workers=workers, asynchronous=True): pass -@pytest.mark.asyncio +@gen_test() async def test_spec_process(): proc = ProcessInterface() assert proc.status == Status.created @@ -270,8 +265,8 @@ async def test_spec_process(): assert proc.status == Status.closed -@pytest.mark.asyncio -async def test_logs(cleanup): +@gen_test() +async def test_logs(): worker = {"cls": Worker, "options": {"nthreads": 1}} async with SpecCluster( asynchronous=True, scheduler=scheduler, worker=worker @@ -306,8 +301,8 @@ async def test_logs(cleanup): assert set(logs) == {w} -@pytest.mark.asyncio -async def test_scheduler_info(cleanup): +@gen_test() +async def test_scheduler_info(): async with SpecCluster( workers=worker_spec, scheduler=scheduler, asynchronous=True ) as cluster: @@ -330,21 +325,18 @@ async def test_scheduler_info(cleanup): assert len(cluster.scheduler_info["workers"]) == len(cluster.workers) -@pytest.mark.asyncio -async def test_dashboard_link(cleanup): +@gen_test() +async def test_dashboard_link(): async with SpecCluster( workers=worker_spec, - scheduler={ - "cls": Scheduler, - "options": {"port": 0, "dashboard_address": ":12345"}, - }, + scheduler={"cls": Scheduler, "options": {"dashboard_address": ":12345"}}, asynchronous=True, ) as cluster: assert "12345" in cluster.dashboard_link -@pytest.mark.asyncio -async def test_widget(cleanup): +@gen_test() +async def test_widget(): async with SpecCluster( workers=worker_spec, scheduler=scheduler, @@ -361,8 +353,8 @@ async def test_widget(cleanup): assert "3 / 5" in cluster._scaling_status() -@pytest.mark.asyncio -async def test_scale_cores_memory(cleanup): +@gen_test() +async def test_scale_cores_memory(): async with SpecCluster( scheduler=scheduler, worker={"cls": Worker, "options": {"nthreads": 1}}, @@ -376,8 +368,8 @@ async def test_scale_cores_memory(cleanup): assert "memory" in str(info.value) -@pytest.mark.asyncio -async def test_ProcessInterfaceValid(cleanup): +@gen_test() +async def test_ProcessInterfaceValid(): async with SpecCluster( scheduler=scheduler, worker={"cls": ProcessInterface}, asynchronous=True ) as cluster: @@ -416,8 +408,8 @@ async def close(self): await asyncio.gather(*[w.close() for w in self.workers]) -@pytest.mark.asyncio -async def test_MultiWorker(cleanup): +@gen_test() +async def test_MultiWorker(): async with SpecCluster( scheduler=scheduler, worker={ @@ -468,22 +460,17 @@ async def test_MultiWorker(cleanup): assert len(cluster.workers) == 1 -@pytest.mark.asyncio -async def test_run_spec(cleanup): - async with Scheduler(port=0) as s: - workers = await run_spec(worker_spec, s.address) - async with Client(s.address, asynchronous=True) as c: - await c.wait_for_workers(len(worker_spec)) - - await asyncio.gather(*[w.close() for w in workers.values()]) - - assert not s.workers - - await asyncio.gather(*[w.finished() for w in workers.values()]) +@gen_cluster(client=True, nthreads=[]) +async def test_run_spec(c, s): + workers = await run_spec(worker_spec, s.address) + await c.wait_for_workers(len(worker_spec)) + await asyncio.gather(*[w.close() for w in workers.values()]) + assert not s.workers + await asyncio.gather(*[w.finished() for w in workers.values()]) -@pytest.mark.asyncio -async def test_run_spec_cluster_worker_names(cleanup): +@gen_test() +async def test_run_spec_cluster_worker_names(): worker = {"cls": Worker, "options": {"nthreads": 1}} class MyCluster(SpecCluster): @@ -509,8 +496,8 @@ def _new_worker_name(self, worker_number): assert sorted(list(cluster.workers)) == worker_names -@pytest.mark.asyncio -async def test_bad_close(cleanup): +@gen_test() +async def test_bad_close(): with warnings.catch_warnings(record=True) as record: cluster = SpecCluster( workers=worker_spec, scheduler=scheduler, asynchronous=True diff --git a/distributed/deploy/tests/test_ssh.py b/distributed/deploy/tests/test_ssh.py index 16cb100b8ff..33baf565d6c 100644 --- a/distributed/deploy/tests/test_ssh.py +++ b/distributed/deploy/tests/test_ssh.py @@ -32,7 +32,7 @@ async def test_basic(): ["127.0.0.1"] * 3, connect_options=dict(known_hosts=None), asynchronous=True, - scheduler_options={"port": 0, "idle_timeout": "5s"}, + scheduler_options={"idle_timeout": "5s"}, worker_options={"death_timeout": "5s"}, ) as cluster: assert len(cluster.workers) == 2 @@ -56,7 +56,7 @@ async def test_keywords(): "memory_limit": "2 GiB", "death_timeout": "5s", }, - scheduler_options={"idle_timeout": "10s", "port": 0}, + scheduler_options={"idle_timeout": "10s"}, ) as cluster: async with Client(cluster, asynchronous=True) as client: assert ( @@ -109,7 +109,7 @@ def f(x): ["127.0.0.1"] * 2, connect_options=dict(known_hosts=None), asynchronous=True, - scheduler_options={"port": 0, "idle_timeout": "5s"}, + scheduler_options={"idle_timeout": "5s"}, worker_options={"death_timeout": "5s"}, ) as cluster: async with Client(cluster, asynchronous=True) as client: @@ -130,7 +130,7 @@ async def test_unimplemented_options(): "death_timeout": "5s", "unimplemented_option": 2, }, - scheduler_kwargs={"idle_timeout": "5s", "port": 0}, + scheduler_kwargs={"idle_timeout": "5s"}, ) as cluster: assert cluster @@ -141,7 +141,7 @@ async def test_list_of_connect_options(): ["127.0.0.1"] * 3, connect_options=[dict(known_hosts=None)] * 3, asynchronous=True, - scheduler_options={"port": 0, "idle_timeout": "5s"}, + scheduler_options={"idle_timeout": "5s"}, worker_options={"death_timeout": "5s"}, ) as cluster: assert len(cluster.workers) == 2 @@ -160,7 +160,7 @@ async def test_list_of_connect_options_raises(): ["127.0.0.1"] * 3, connect_options=[dict(known_hosts=None)] * 4, # Mismatch in length 4 != 3 asynchronous=True, - scheduler_options={"port": 0, "idle_timeout": "5s"}, + scheduler_options={"idle_timeout": "5s"}, worker_options={"death_timeout": "5s"}, ) as _: pass @@ -172,7 +172,7 @@ async def test_remote_python(): ["127.0.0.1"] * 3, connect_options=[dict(known_hosts=None)] * 3, asynchronous=True, - scheduler_options={"port": 0, "idle_timeout": "5s"}, + scheduler_options={"idle_timeout": "5s"}, worker_options={"death_timeout": "5s"}, remote_python=sys.executable, ) as cluster: @@ -185,7 +185,7 @@ async def test_remote_python_as_dict(): ["127.0.0.1"] * 3, connect_options=[dict(known_hosts=None)] * 3, asynchronous=True, - scheduler_options={"port": 0, "idle_timeout": "5s"}, + scheduler_options={"idle_timeout": "5s"}, worker_options={"death_timeout": "5s"}, remote_python=[sys.executable] * 3, ) as cluster: @@ -199,7 +199,7 @@ async def test_list_of_remote_python_raises(): ["127.0.0.1"] * 3, connect_options=[dict(known_hosts=None)] * 3, asynchronous=True, - scheduler_options={"port": 0, "idle_timeout": "5s"}, + scheduler_options={"idle_timeout": "5s"}, worker_options={"death_timeout": "5s"}, remote_python=[sys.executable] * 4, # Mismatch in length 4 != 3 ) as _: diff --git a/distributed/diagnostics/tests/test_progressbar.py b/distributed/diagnostics/tests/test_progressbar.py index d358c9a494a..ef14e20a1f3 100644 --- a/distributed/diagnostics/tests/test_progressbar.py +++ b/distributed/diagnostics/tests/test_progressbar.py @@ -1,8 +1,5 @@ from time import sleep -import pytest - -from distributed import Scheduler, Worker from distributed.diagnostics.progressbar import TextProgressBar, progress from distributed.metrics import time from distributed.utils_test import div, gen_cluster, inc @@ -39,18 +36,13 @@ async def test_TextProgressBar_error(c, s, a, b): assert progress.comm.closed() -@pytest.mark.asyncio -async def test_TextProgressBar_empty(capsys): - async with Scheduler(port=0) as s: - async with Worker(s.address, nthreads=1): - async with Worker(s.address, nthreads=1): - progress = TextProgressBar( - [], scheduler=s.address, start=False, interval=0.01 - ) - await progress.listen() +@gen_cluster() +async def test_TextProgressBar_empty(s, a, b, capsys): + progress = TextProgressBar([], scheduler=s.address, start=False, interval=0.01) + await progress.listen() - assert progress.status == "finished" - check_bar_completed(capsys) + assert progress.status == "finished" + check_bar_completed(capsys) def check_bar_completed(capsys, width=40): diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 111ad5e24d9..75cd31f22f6 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -1,7 +1,7 @@ import pytest from distributed import Scheduler, SchedulerPlugin, Worker -from distributed.utils_test import gen_cluster, inc +from distributed.utils_test import gen_cluster, gen_test, inc @gen_cluster(client=True) @@ -106,8 +106,8 @@ async def remove_worker(self, worker, scheduler): assert events == [] -@pytest.mark.asyncio -async def test_lifecycle(cleanup): +@gen_test() +async def test_lifecycle(): class LifeCycle(SchedulerPlugin): def __init__(self): self.history = [] @@ -120,7 +120,7 @@ async def close(self): self.history.append("closed") plugin = LifeCycle() - async with Scheduler(plugins=[plugin]) as s: + async with Scheduler(plugins=[plugin], dashboard_address=":0") as s: pass assert plugin.history == ["started", "closed"] diff --git a/distributed/node.py b/distributed/node.py index a7f9b8d31ac..527218c187b 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -158,9 +158,7 @@ def start_http_server( ): if expected != actual and expected > 0: warnings.warn( - "Port {} is already in use.\n" + f"Port {expected} is already in use.\n" "Perhaps you already have a cluster running?\n" - "Hosting the HTTP server on port {} instead".format( - expected, actual - ) + f"Hosting the HTTP server on port {actual} instead" ) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 52808ca03af..b89f4aa3013 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -474,8 +474,9 @@ def f(block, ps=None): print(format_time(end - start)) +@pytest.mark.slow @pytest.mark.flaky(reruns=10, reruns_delay=5) -@gen_cluster(client=True) +@gen_cluster(client=True, timeout=120) async def test_compute(c, s, a, b): @dask.delayed def f(n, counter): diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 7305ab1fdd8..5d43577dea8 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -1979,13 +1979,13 @@ async def test_badly_serialized_input_stderr(capsys, c): def test_repr(loop): funcs = [str, repr, lambda x: x._repr_html_()] - with cluster(nworkers=3, worker_kwargs={"memory_limit": "2 GB"}) as (s, [a, b, c]): + with cluster(nworkers=3, worker_kwargs={"memory_limit": "2 GiB"}) as (s, [a, b, c]): with Client(s["address"], loop=loop) as c: for func in funcs: text = func(c) assert c.scheduler.address in text assert "threads=3" in text or "Total threads: " in text - assert "6.00 GB" in text or "5.59 GiB" in text + assert "6.00 GiB" in text if " 0 @@ -125,10 +125,10 @@ def test_cancellation(client): assert fs[3] in res.done assert fs[3].cancelled() - # With as_completed() + +def test_cancellation_as_completed(client): with client.get_executor(pure=False) as e: - N = 10 - fs = [e.submit(slowinc, i, delay=0.02) for i in range(N)] + fs = [e.submit(slowinc, i, delay=0.1) for i in range(10)] fs[3].cancel() fs[8].cancel() diff --git a/distributed/tests/test_client_loop.py b/distributed/tests/test_client_loop.py index 63a08cb1639..46cd2ec6b11 100644 --- a/distributed/tests/test_client_loop.py +++ b/distributed/tests/test_client_loop.py @@ -11,7 +11,7 @@ def test_close_loop_sync(with_own_loop): # Setup simple cluster with one threaded worker. # Complex setup is not required here since we test only IO loop teardown. - cluster_params = dict(n_workers=1, dashboard_address=None, processes=False) + cluster_params = dict(n_workers=1, dashboard_address=":0", processes=False) loops_before = LoopRunner._all_loops.copy() diff --git a/distributed/tests/test_collections.py b/distributed/tests/test_collections.py index 373160d5f7c..4ab2730736b 100644 --- a/distributed/tests/test_collections.py +++ b/distributed/tests/test_collections.py @@ -48,8 +48,7 @@ async def test_dataframes(c, s, a, b): ) ldf = dd.from_pandas(df, npartitions=10) - rdf = c.persist(ldf) - + rdf = await c.persist(ldf) assert rdf.divisions == ldf.divisions remote = c.compute(rdf) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 2b3e229c38d..f07d3c075c2 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -65,6 +65,7 @@ async def test_submit_after_failed_worker(c, s, a, b): assert result == sum(map(inc, range(10))) +@pytest.mark.slow def test_gather_after_failed_worker(loop): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: @@ -75,7 +76,8 @@ def test_gather_after_failed_worker(loop): assert result == list(map(inc, range(10))) -@gen_cluster(client=True, Worker=Nanny, nthreads=[("127.0.0.1", 1)] * 4) +@pytest.mark.slow +@gen_cluster(client=True, Worker=Nanny, nthreads=[("127.0.0.1", 1)] * 4, timeout=60) async def test_gather_then_submit_after_failed_workers(c, s, w, x, y, z): L = c.map(inc, range(20)) await wait(L) diff --git a/distributed/tests/test_locks.py b/distributed/tests/test_locks.py index 99bd5ebdca5..a20a298b808 100644 --- a/distributed/tests/test_locks.py +++ b/distributed/tests/test_locks.py @@ -4,7 +4,7 @@ import pytest -from distributed import Client, Lock, get_client +from distributed import Lock, get_client from distributed.metrics import time from distributed.utils_test import gen_cluster @@ -130,11 +130,10 @@ def f(x, lock=None): assert lock2.client is lock.client -@pytest.mark.asyncio -async def test_locks(): - async with Client(processes=False, asynchronous=True) as c: - assert c.asynchronous - async with Lock("x"): - lock2 = Lock("x") - result = await lock2.acquire(timeout=0.1) - assert result is False +@gen_cluster(client=True, nthreads=[]) +async def test_locks(c, s): + async with Lock("x") as l1: + l2 = Lock("x") + assert l1.client is c + assert l2.client is c + assert await l2.acquire(timeout=0.01) is False diff --git a/distributed/tests/test_multi_locks.py b/distributed/tests/test_multi_locks.py index 9ece4e9c2c2..ff46d40b0ed 100644 --- a/distributed/tests/test_multi_locks.py +++ b/distributed/tests/test_multi_locks.py @@ -1,6 +1,8 @@ import asyncio from time import sleep +import pytest + from distributed import MultiLock, get_client from distributed.metrics import time from distributed.multi_lock import MultiLockExtension @@ -53,34 +55,25 @@ async def test_timeout(c, s, a, b): await lock1.release() -@gen_cluster(client=True) -async def test_timeout_wake_waiter(c, s, a, b): - ext: MultiLockExtension = s.extensions["multi_locks"] +@gen_cluster() +async def test_timeout_wake_waiter(s, a, b): l1 = MultiLock(names=["x"]) l2 = MultiLock(names=["x", "y"]) l3 = MultiLock(names=["y"]) await l1.acquire() - l2_acquire = asyncio.ensure_future(l2.acquire(timeout=1)) - try: + l2_acquire = asyncio.ensure_future(l2.acquire(timeout=0.5)) + with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(asyncio.shield(l2_acquire), 0.1) - except asyncio.TimeoutError: - pass - else: - assert False l3_acquire = asyncio.ensure_future(l3.acquire()) - try: + with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(asyncio.shield(l3_acquire), 0.1) - except asyncio.TimeoutError: - pass - else: - assert False assert await l2_acquire is False assert await l3_acquire - l1.release() - l3.release() + await l1.release() + await l3.release() @gen_cluster(client=True) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 87ea7b9bc53..0da4d7e742d 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -17,7 +17,7 @@ import dask -from distributed import Client, Nanny, Scheduler, Worker, rpc, wait, worker +from distributed import Nanny, Scheduler, Worker, rpc, wait, worker from distributed.compatibility import LINUX, WINDOWS from distributed.core import CommClosedError, Status from distributed.diagnostics import SchedulerPlugin @@ -27,8 +27,8 @@ from distributed.utils_test import captured_logger, gen_cluster, gen_test, inc -# FIXME why does this leave behind unclosed Comm objects? -@gen_cluster(nthreads=[], allow_unclosed=True) +@pytest.mark.slow +@gen_cluster(nthreads=[], timeout=120) async def test_nanny(s): async with Nanny(s.address, nthreads=2, loop=s.loop) as n: async with rpc(n.address) as nn: @@ -124,7 +124,7 @@ async def test_run(s): @pytest.mark.slow -@gen_cluster(config={"distributed.comm.timeouts.connect": "1s"}) +@gen_cluster(config={"distributed.comm.timeouts.connect": "1s"}, timeout=120) async def test_no_hang_when_scheduler_closes(s, a, b): # https://github.com/dask/distributed/issues/2880 with captured_logger("tornado.application", logging.ERROR) as logger: @@ -235,7 +235,7 @@ def func(dask_worker): @gen_test() async def test_scheduler_file(): with tmpfile() as fn: - s = await Scheduler(scheduler_file=fn, port=8008) + s = await Scheduler(scheduler_file=fn, dashboard_address=":0") w = await Nanny(scheduler_file=fn) assert set(s.workers) == {w.worker_address} await w.close() @@ -441,21 +441,19 @@ def pool_worker(world_size): await c.submit(pool_worker, 4) -@pytest.mark.asyncio -async def test_nanny_closes_cleanly(cleanup): - async with Scheduler() as s: - n = await Nanny(s.address) +@gen_cluster(nthreads=[]) +async def test_nanny_closes_cleanly(s): + async with Nanny(s.address) as n: assert n.process.pid proc = n.process.process - await n.close() - assert not n.process - assert not proc.is_alive() - assert proc.exitcode == 0 + assert not n.process + assert not proc.is_alive() + assert proc.exitcode == 0 @pytest.mark.slow -@pytest.mark.asyncio -async def test_lifetime(cleanup): +@gen_cluster(nthreads=[], timeout=60) +async def test_lifetime(s): counter = 0 event = asyncio.Event() @@ -469,63 +467,52 @@ def remove_worker(self, **kwargs): if counter == 2: # wait twice, then trigger closing event event.set() - async with Scheduler() as s: - s.add_plugin(Plugin()) - async with Nanny(s.address) as a: - async with Nanny(s.address, lifetime="500 ms", lifetime_restart=True) as b: - await event.wait() + s.add_plugin(Plugin()) + async with Nanny(s.address): + async with Nanny(s.address, lifetime="500 ms", lifetime_restart=True): + await event.wait() -@pytest.mark.asyncio -async def test_nanny_closes_cleanly_2(cleanup): - async with Scheduler() as s: - async with Nanny(s.address) as n: - async with Client(s.address, asynchronous=True) as client: - with client.rpc(n.worker_address) as w: - IOLoop.current().add_callback(w.terminate) - start = time() - while n.status != Status.closed: - await asyncio.sleep(0.01) - assert time() < start + 5 +@gen_cluster(client=True, nthreads=[]) +async def test_nanny_closes_cleanly_2(c, s): + async with Nanny(s.address) as n: + with c.rpc(n.worker_address) as w: + IOLoop.current().add_callback(w.terminate) + start = time() + while n.status != Status.closed: + await asyncio.sleep(0.01) + assert time() < start + 5 - assert n.status == Status.closed + assert n.status == Status.closed -@pytest.mark.asyncio -async def test_config(cleanup): - async with Scheduler() as s: - async with Nanny(s.address, config={"foo": "bar"}) as n: - async with Client(s.address, asynchronous=True) as client: - config = await client.run(dask.config.get, "foo") - assert config[n.worker_address] == "bar" +@gen_cluster(client=True, nthreads=[]) +async def test_config(c, s): + async with Nanny(s.address, config={"foo": "bar"}) as n: + config = await c.run(dask.config.get, "foo") + assert config[n.worker_address] == "bar" -@pytest.mark.asyncio -async def test_nanny_port_range(cleanup): - async with Scheduler() as s: - async with Client(s.address, asynchronous=True) as client: - nanny_port = "9867:9868" - worker_port = "9869:9870" - async with Nanny(s.address, port=nanny_port, worker_port=worker_port) as n1: - assert n1.port == 9867 # Selects first port in range - async with Nanny( - s.address, port=nanny_port, worker_port=worker_port - ) as n2: - assert n2.port == 9868 # Selects next port in range - with pytest.raises( - ValueError, match="Could not start Nanny" - ): # No more ports left - async with Nanny( - s.address, port=nanny_port, worker_port=worker_port - ): - pass - - # Ensure Worker ports are in worker_port range - def get_worker_port(dask_worker): - return dask_worker.port - - worker_ports = await client.run(get_worker_port) - assert list(worker_ports.values()) == parse_ports(worker_port) +@gen_cluster(client=True, nthreads=[]) +async def test_nanny_port_range(c, s): + nanny_port = "9867:9868" + worker_port = "9869:9870" + async with Nanny(s.address, port=nanny_port, worker_port=worker_port) as n1: + assert n1.port == 9867 # Selects first port in range + async with Nanny(s.address, port=nanny_port, worker_port=worker_port) as n2: + assert n2.port == 9868 # Selects next port in range + with pytest.raises( + ValueError, match="Could not start Nanny" + ): # No more ports left + async with Nanny(s.address, port=nanny_port, worker_port=worker_port): + pass + + # Ensure Worker ports are in worker_port range + def get_worker_port(dask_worker): + return dask_worker.port + + worker_ports = await c.run(get_worker_port) + assert list(worker_ports.values()) == parse_ports(worker_port) class KeyboardInterruptWorker(worker.Worker): @@ -544,7 +531,7 @@ async def test_nanny_closed_by_keyboard_interrupt(cleanup, protocol): if protocol == "ucx": # Skip if UCX isn't available pytest.importorskip("ucp") - async with Scheduler(protocol=protocol) as s: + async with Scheduler(protocol=protocol, dashboard_address=":0") as s: async with Nanny( s.address, nthreads=1, worker_class=KeyboardInterruptWorker ) as n: @@ -571,14 +558,13 @@ async def test_worker_start_exception(s): pass -@pytest.mark.asyncio -async def test_failure_during_worker_initialization(cleanup): +@gen_cluster(nthreads=[]) +async def test_failure_during_worker_initialization(s): with captured_logger(logger="distributed.nanny", level=logging.WARNING) as logs: - async with Scheduler() as s: - with pytest.raises(Exception): - async with Nanny(s.address, foo="bar") as n: - await n - assert "Restarting worker" not in logs.getvalue() + with pytest.raises(Exception): + async with Nanny(s.address, foo="bar") as n: + await n + assert "Restarting worker" not in logs.getvalue() @gen_cluster(client=True, Worker=Nanny, timeout=10000000) diff --git a/distributed/tests/test_preload.py b/distributed/tests/test_preload.py index 75946676d5c..1fc74ee346c 100644 --- a/distributed/tests/test_preload.py +++ b/distributed/tests/test_preload.py @@ -9,7 +9,7 @@ import dask from distributed import Client, Nanny, Scheduler, Worker -from distributed.utils_test import captured_logger, cluster +from distributed.utils_test import captured_logger, cluster, gen_cluster, gen_test PRELOAD_TEXT = """ _worker_info = {} @@ -45,20 +45,20 @@ def check_worker(): shutil.rmtree(tmpdir) -@pytest.mark.asyncio -async def test_worker_preload_text(cleanup): +@gen_test() +async def test_worker_preload_text(): text = """ def dask_setup(worker): worker.foo = 'setup' """ - async with Scheduler(port=0, preload=text) as s: + async with Scheduler(dashboard_address=":0", preload=text) as s: assert s.foo == "setup" async with Worker(s.address, preload=[text]) as w: assert w.foo == "setup" -@pytest.mark.asyncio -async def test_worker_preload_config(cleanup): +@gen_cluster(nthreads=[]) +async def test_worker_preload_config(s): text = """ def dask_setup(worker): worker.foo = 'setup' @@ -69,13 +69,12 @@ def dask_teardown(worker): with dask.config.set( {"distributed.worker.preload": text, "distributed.nanny.preload": text} ): - async with Scheduler(port=0) as s: - async with Nanny(s.address) as w: - assert w.foo == "setup" - async with Client(s.address, asynchronous=True) as c: - d = await c.run(lambda dask_worker: dask_worker.foo) - assert d == {w.worker_address: "setup"} - assert w.foo == "teardown" + async with Nanny(s.address) as w: + assert w.foo == "setup" + async with Client(s.address, asynchronous=True) as c: + d = await c.run(lambda dask_worker: dask_worker.foo) + assert d == {w.worker_address: "setup"} + assert w.foo == "teardown" def test_worker_preload_module(loop): @@ -104,34 +103,33 @@ def check_worker(): shutil.rmtree(tmpdir) -@pytest.mark.asyncio -async def test_worker_preload_click(cleanup, tmpdir): - CLICK_PRELOAD_TEXT = """ +@gen_cluster(nthreads=[]) +async def test_worker_preload_click(s): + text = """ import click @click.command() def dask_setup(worker): worker.foo = 'setup' """ - async with Scheduler(port=0) as s: - async with Worker(s.address, preload=CLICK_PRELOAD_TEXT) as w: - assert w.foo == "setup" + async with Worker(s.address, preload=text) as w: + assert w.foo == "setup" -@pytest.mark.asyncio -async def test_worker_preload_click_async(cleanup, tmpdir): + +@gen_cluster(nthreads=[]) +async def test_worker_preload_click_async(s, tmpdir): # Ensure we allow for click commands wrapping coroutines # https://github.com/dask/distributed/issues/4169 - CLICK_PRELOAD_TEXT = """ + text = """ import click @click.command() async def dask_setup(worker): worker.foo = 'setup' """ - async with Scheduler(port=0) as s: - async with Worker(s.address, preload=CLICK_PRELOAD_TEXT) as w: - assert w.foo == "setup" + async with Worker(s.address, preload=text) as w: + assert w.foo == "setup" @pytest.mark.asyncio @@ -143,7 +141,7 @@ async def test_preload_import_time(cleanup): backends["foo"] = TCPBackend() """.strip() try: - async with Scheduler(port=0, preload=text, protocol="foo") as s: + async with Scheduler(dashboard_address=":0", preload=text, protocol="foo") as s: async with Nanny(s.address, preload=text, protocol="foo") as n: async with Client(s.address, asynchronous=True) as c: await c.wait_for_workers(1) @@ -168,30 +166,31 @@ def dask_setup(dask_server): server = app.listen(12345) try: with captured_logger("distributed.preloading") as log: - async with Scheduler(preload=["http://localhost:12345/preload"]) as s: + async with Scheduler( + dashboard_address=":0", + preload=["http://localhost:12345/preload"], + ) as s: assert s.foo == 1 assert "12345/preload" in log.getvalue() finally: server.stop() -@pytest.mark.asyncio -async def test_scheduler_startup(cleanup): - async with Scheduler(port=0) as s: - text = f""" +@gen_cluster(nthreads=[]) +async def test_scheduler_startup(s): + text = f""" import dask dask.config.set(scheduler_address="{s.address}") """ - async with Worker(preload=text) as w: - assert w.scheduler.address == s.address + async with Worker(preload=text) as w: + assert w.scheduler.address == s.address -@pytest.mark.asyncio -async def test_scheduler_startup_nanny(cleanup): - async with Scheduler(port=0) as s: - text = f""" +@gen_cluster(nthreads=[]) +async def test_scheduler_startup_nanny(s): + text = f""" import dask dask.config.set(scheduler_address="{s.address}") """ - async with Nanny(preload_nanny=text) as w: - assert w.scheduler.address == s.address + async with Nanny(preload_nanny=text) as w: + assert w.scheduler.address == s.address diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index a948a8229b3..202aa928d84 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -110,7 +110,7 @@ def f(x): @pytest.mark.slow -@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny) +@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=60) async def test_race(c, s, *workers): def f(i): with worker_client() as c: diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index 6adb9d2dd08..6f74279297f 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -348,7 +348,7 @@ async def test_dont_optimize_out(c, s, a, b): assert "executing" in str(a.story(key)) -@pytest.mark.xfail(reason="atop fusion seemed to break this") +@pytest.mark.skip(reason="atop fusion seemed to break this") @gen_cluster( client=True, nthreads=[ diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index cb848075d59..e17ab010b1f 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -420,9 +420,10 @@ def func(scheduler): await comm.close() -def test_scheduler_init_pulls_blocked_handlers_from_config(): - with dask.config.set({"distributed.scheduler.blocked-handlers": ["test-handler"]}): - s = Scheduler() +@gen_cluster( + nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]} +) +def test_scheduler_init_pulls_blocked_handlers_from_config(s): assert s.blocked_handlers == ["test-handler"] @@ -673,9 +674,8 @@ async def test_broadcast_nanny(s, a, b): assert result1 == result3 -@gen_test() -async def test_worker_name(): - s = await Scheduler(validate=True, port=0) +@gen_cluster(nthreads=[]) +async def test_worker_name(s): w = await Worker(s.address, name="alice") assert s.workers[w.address].name == "alice" assert s.aliases["alice"] == w.address @@ -685,60 +685,53 @@ async def test_worker_name(): await w2.close() await w.close() - await s.close() -@gen_test() -async def test_coerce_address(): - with dask.config.set({"distributed.comm.timeouts.connect": "100ms"}): - s = await Scheduler(validate=True, port=0) - print("scheduler:", s.address, s.listen_address) - a = Worker(s.address, name="alice") - b = Worker(s.address, name=123) - c = Worker("127.0.0.1", s.port, name="charlie") - await asyncio.gather(a, b, c) - - assert s.coerce_address("127.0.0.1:8000") == "tcp://127.0.0.1:8000" - assert s.coerce_address("[::1]:8000") == "tcp://[::1]:8000" - assert s.coerce_address("tcp://127.0.0.1:8000") == "tcp://127.0.0.1:8000" - assert s.coerce_address("tcp://[::1]:8000") == "tcp://[::1]:8000" - assert s.coerce_address("localhost:8000") in ( - "tcp://127.0.0.1:8000", - "tcp://[::1]:8000", - ) - assert s.coerce_address("localhost:8000") in ( - "tcp://127.0.0.1:8000", - "tcp://[::1]:8000", - ) - assert s.coerce_address(a.address) == a.address - # Aliases - assert s.coerce_address("alice") == a.address - assert s.coerce_address(123) == b.address - assert s.coerce_address("charlie") == c.address +@gen_cluster(nthreads=[]) +async def test_coerce_address(s): + print("scheduler:", s.address, s.listen_address) + a = Worker(s.address, name="alice") + b = Worker(s.address, name=123) + c = Worker("127.0.0.1", s.port, name="charlie") + await asyncio.gather(a, b, c) + + assert s.coerce_address("127.0.0.1:8000") == "tcp://127.0.0.1:8000" + assert s.coerce_address("[::1]:8000") == "tcp://[::1]:8000" + assert s.coerce_address("tcp://127.0.0.1:8000") == "tcp://127.0.0.1:8000" + assert s.coerce_address("tcp://[::1]:8000") == "tcp://[::1]:8000" + assert s.coerce_address("localhost:8000") in ( + "tcp://127.0.0.1:8000", + "tcp://[::1]:8000", + ) + assert s.coerce_address("localhost:8000") in ( + "tcp://127.0.0.1:8000", + "tcp://[::1]:8000", + ) + assert s.coerce_address(a.address) == a.address + # Aliases + assert s.coerce_address("alice") == a.address + assert s.coerce_address(123) == b.address + assert s.coerce_address("charlie") == c.address - assert s.coerce_hostname("127.0.0.1") == "127.0.0.1" - assert s.coerce_hostname("alice") == a.ip - assert s.coerce_hostname(123) == b.ip - assert s.coerce_hostname("charlie") == c.ip - assert s.coerce_hostname("jimmy") == "jimmy" + assert s.coerce_hostname("127.0.0.1") == "127.0.0.1" + assert s.coerce_hostname("alice") == a.ip + assert s.coerce_hostname(123) == b.ip + assert s.coerce_hostname("charlie") == c.ip + assert s.coerce_hostname("jimmy") == "jimmy" - assert s.coerce_address("zzzt:8000", resolve=False) == "tcp://zzzt:8000" + assert s.coerce_address("zzzt:8000", resolve=False) == "tcp://zzzt:8000" + await asyncio.gather(a.close(), b.close(), c.close()) - await s.close() - await asyncio.gather(a.close(), b.close(), c.close()) +@gen_cluster(nthreads=[], config={"distributed.scheduler.work-stealing": True}) +async def test_config_stealing(s): + """Regression test for https://github.com/dask/distributed/issues/3409""" + assert "stealing" in s.extensions -@pytest.mark.asyncio -async def test_config_stealing(cleanup): - # Regression test for https://github.com/dask/distributed/issues/3409 - - with dask.config.set({"distributed.scheduler.work-stealing": True}): - async with Scheduler(port=0) as s: - assert "stealing" in s.extensions - with dask.config.set({"distributed.scheduler.work-stealing": False}): - async with Scheduler(port=0) as s: - assert "stealing" not in s.extensions +@gen_cluster(nthreads=[], config={"distributed.scheduler.work-stealing": False}) +async def test_config_no_stealing(s): + assert "stealing" not in s.extensions @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") @@ -771,7 +764,7 @@ async def test_update_graph_culls(s, a, b): def test_io_loop(loop): - s = Scheduler(loop=loop, validate=True) + s = Scheduler(loop=loop, dashboard_address=":0", validate=True) assert s.io_loop is loop @@ -922,7 +915,7 @@ async def test_retire_workers_no_suspicious_tasks(c, s, a, b): @pytest.mark.slow @pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows") -@gen_cluster(client=True, nthreads=[], timeout=60) +@gen_cluster(client=True, nthreads=[], timeout=120) async def test_file_descriptors(c, s): await asyncio.sleep(0.1) da = pytest.importorskip("dask.array") @@ -1284,7 +1277,7 @@ async def test_fifo_submission(c, s, w): @gen_test() async def test_scheduler_file(): with tmpfile() as fn: - s = await Scheduler(scheduler_file=fn, port=0) + s = await Scheduler(scheduler_file=fn, dashboard_address=":0") with open(fn) as f: data = json.load(f) assert data["address"] == s.address @@ -1484,10 +1477,10 @@ async def test_get_task_status(c, s, a, b): assert result == {future.key: "memory"} -def test_deque_handler(): +@gen_cluster(nthreads=[]) +async def test_deque_handler(s): from distributed.scheduler import logger - s = Scheduler() deque_handler = s._deque_handler logger.info("foo123") assert len(deque_handler.deque) >= 1 @@ -1801,7 +1794,7 @@ async def test_close_workers(s, a, b): @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") @gen_test() async def test_host_address(): - s = await Scheduler(host="127.0.0.2", port=0) + s = await Scheduler(host="127.0.0.2", dashboard_address=":0") assert "127.0.0.2" in s.address await s.close() @@ -1809,25 +1802,20 @@ async def test_host_address(): @gen_test() async def test_dashboard_address(): pytest.importorskip("bokeh") - s = await Scheduler(dashboard_address="127.0.0.1:8901", port=0) - assert s.services["dashboard"].port == 8901 - await s.close() + async with Scheduler(dashboard_address="127.0.0.1:8901") as s: + assert s.services["dashboard"].port == 8901 - s = await Scheduler(dashboard_address="127.0.0.1", port=0) - assert s.services["dashboard"].port - await s.close() + async with Scheduler(dashboard_address="127.0.0.1") as s: + assert s.services["dashboard"].port - s = await Scheduler(dashboard_address="127.0.0.1:8901,127.0.0.1:8902", port=0) - assert s.services["dashboard"].port == 8901 - await s.close() + async with Scheduler(dashboard_address="127.0.0.1:8901,127.0.0.1:8902") as s: + assert s.services["dashboard"].port == 8901 - s = await Scheduler(dashboard_address=":8901,:8902", port=0) - assert s.services["dashboard"].port == 8901 - await s.close() + async with Scheduler(dashboard_address=":8901,:8902") as s: + assert s.services["dashboard"].port == 8901 - s = await Scheduler(dashboard_address=[8901, 8902], port=0) - assert s.services["dashboard"].port == 8901 - await s.close() + async with Scheduler(dashboard_address=[8901, 8902]) as s: + assert s.services["dashboard"].port == 8901 @gen_cluster(client=True) @@ -1856,9 +1844,9 @@ async def test_adaptive_target(c, s, a, b): assert s.adaptive_target(target_duration=".1s") == 0 -@pytest.mark.asyncio -async def test_async_context_manager(cleanup): - async with Scheduler(port=0) as s: +@gen_test() +async def test_async_context_manager(): + async with Scheduler(dashboard_address=":0") as s: assert s.status == Status.running async with Worker(s.address) as w: assert w.status == Status.running @@ -1866,23 +1854,23 @@ async def test_async_context_manager(cleanup): assert not s.workers -@pytest.mark.asyncio -async def test_allowed_failures_config(cleanup): - async with Scheduler(port=0, allowed_failures=10) as s: +@gen_test() +async def test_allowed_failures_config(): + async with Scheduler(dashboard_address=":0", allowed_failures=10) as s: assert s.allowed_failures == 10 with dask.config.set({"distributed.scheduler.allowed_failures": 100}): - async with Scheduler(port=0) as s: + async with Scheduler(dashboard_address=":0") as s: assert s.allowed_failures == 100 with dask.config.set({"distributed.scheduler.allowed_failures": 0}): - async with Scheduler(port=0) as s: + async with Scheduler(dashboard_address=":0") as s: assert s.allowed_failures == 0 -@pytest.mark.asyncio +@gen_test() async def test_finished(): - async with Scheduler(port=0) as s: + async with Scheduler(dashboard_address=":0") as s: async with Worker(s.address) as w: pass @@ -1890,48 +1878,45 @@ async def test_finished(): await w.finished() -@pytest.mark.asyncio -async def test_retire_names_str(cleanup): - async with Scheduler(port=0) as s: - async with Worker(s.address, name="0") as a: - async with Worker(s.address, name="1") as b: - async with Client(s.address, asynchronous=True) as c: - futures = c.map(inc, range(10)) - await wait(futures) - assert a.data and b.data - await s.retire_workers(names=[0]) - assert all(f.done() for f in futures) - assert len(b.data) == 10 +@gen_cluster(nthreads=[], client=True) +async def test_retire_names_str(c, s): + async with Worker(s.address, name="0") as a, Worker(s.address, name="1") as b: + futures = c.map(inc, range(10)) + await wait(futures) + assert a.data and b.data + await s.retire_workers(names=[0]) + assert all(f.done() for f in futures) + assert len(b.data) == 10 -@gen_cluster(client=True) +@gen_cluster( + client=True, config={"distributed.scheduler.default-task-durations": {"inc": 100}} +) async def test_get_task_duration(c, s, a, b): - with dask.config.set( - {"distributed.scheduler.default-task-durations": {"inc": 100}} - ): - future = c.submit(inc, 1) - await future - assert 10 < s.task_prefixes["inc"].duration_average < 100 + future = c.submit(inc, 1) + await future + assert 10 < s.task_prefixes["inc"].duration_average < 100 - ts_pref1 = s.new_task("inc-abcdefab", None, "released") - assert 10 < s.get_task_duration(ts_pref1) < 100 + ts_pref1 = s.new_task("inc-abcdefab", None, "released") + assert 10 < s.get_task_duration(ts_pref1) < 100 - # make sure get_task_duration adds TaskStates to unknown dict - assert len(s.unknown_durations) == 0 - x = c.submit(slowinc, 1, delay=0.5) - while len(s.tasks) < 3: - await asyncio.sleep(0.01) + # make sure get_task_duration adds TaskStates to unknown dict + assert len(s.unknown_durations) == 0 + x = c.submit(slowinc, 1, delay=0.5) + while len(s.tasks) < 3: + await asyncio.sleep(0.01) - ts = s.tasks[x.key] - assert s.get_task_duration(ts) == 0.5 # default - assert len(s.unknown_durations) == 1 - assert len(s.unknown_durations["slowinc"]) == 1 + ts = s.tasks[x.key] + assert s.get_task_duration(ts) == 0.5 # default + assert len(s.unknown_durations) == 1 + assert len(s.unknown_durations["slowinc"]) == 1 @gen_cluster(client=True) async def test_default_task_duration_splits(c, s, a, b): - """This test ensures that the default task durations for shuffle split tasks are, by default, aligned with the task names of dask.dask""" - + """Ensure that the default task durations for shuffle split tasks are, by default, + aligned with the task names of dask.dask + """ pd = pytest.importorskip("pandas") dd = pytest.importorskip("dask.dataframe") @@ -1956,13 +1941,13 @@ async def test_default_task_duration_splits(c, s, a, b): assert default_time <= 1e-6 -@pytest.mark.asyncio -async def test_no_danglng_asyncio_tasks(cleanup): +@gen_test() +async def test_no_danglng_asyncio_tasks(): start = asyncio.all_tasks() - async with Scheduler(port=0) as s: - async with Worker(s.address, name="0") as a: + async with Scheduler(dashboard_address=":0") as s: + async with Worker(s.address, name="0"): async with Client(s.address, asynchronous=True) as c: - await asyncio.sleep(0.01) + await c.submit(lambda: 1) tasks = asyncio.all_tasks() assert tasks == start @@ -2276,10 +2261,10 @@ async def test_too_many_groups(c, s, a, b): assert len(s.task_groups) < 3 -@pytest.mark.asyncio -async def test_multiple_listeners(cleanup): +@gen_test() +async def test_multiple_listeners(): with captured_logger(logging.getLogger("distributed.scheduler")) as log: - async with Scheduler(port=0, protocol=["inproc", "tcp"]) as s: + async with Scheduler(dashboard_address=":0", protocol=["inproc", "tcp"]) as s: async with Worker(s.listeners[0].contact_address) as a: async with Worker(s.listeners[1].contact_address) as b: assert a.address.startswith("inproc") @@ -2339,13 +2324,15 @@ async def test_retire_state_change(c, s, a, b): np = pytest.importorskip("numpy") y = c.map(lambda x: x ** 2, range(10)) await c.scatter(y) + coros = [] for x in range(2): v = c.map(lambda i: i * np.random.randint(1000), y) k = c.map(lambda i: i * np.random.randint(1000), v) foo = c.map(lambda j: j * 6, k) step = c.compute(foo) - c.gather(step) + coros.append(c.gather(step)) await c.retire_workers(workers=[a.address]) + await asyncio.gather(*coros) @gen_cluster(client=True, config={"distributed.scheduler.events-log-length": 3}) diff --git a/distributed/tests/test_semaphore.py b/distributed/tests/test_semaphore.py index 0cc4bc9b155..bf0f8181e17 100644 --- a/distributed/tests/test_semaphore.py +++ b/distributed/tests/test_semaphore.py @@ -183,7 +183,8 @@ def f(x, release=True): assert result.count(False) == 9 -@gen_cluster(client=True) +@pytest.mark.slow +@gen_cluster(client=True, timeout=120) async def test_close_async(c, s, a, b): sem = await Semaphore(name="test") @@ -515,7 +516,9 @@ def test_threadpoolworkers_pick_correct_ioloop(cleanup): "distributed.scheduler.locks.lease-timeout": 0.1, } ): - with Client(processes=False, threads_per_worker=4) as client: + with Client( + processes=False, dashboard_address=":0", threads_per_worker=4 + ) as client: sem = Semaphore(max_leases=1, name="database") protected_resource = [] diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index dba1daa721f..3781e7c38e2 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -89,7 +89,7 @@ def test_cancel_stress_sync(loop): @gen_cluster( nthreads=[], client=True, - timeout=120, + timeout=180, scheduler_kwargs={"allowed_failures": 100_000}, ) async def test_stress_creation_and_deletion(c, s): @@ -220,7 +220,7 @@ async def test_stress_steal(c, s, *workers): @pytest.mark.slow -@gen_cluster(nthreads=[("127.0.0.1", 1)] * 10, client=True, timeout=120) +@gen_cluster(nthreads=[("127.0.0.1", 1)] * 10, client=True, timeout=180) async def test_close_connections(c, s, *workers): da = pytest.importorskip("dask.array") x = da.random.random(size=(1000, 1000), chunks=(1000, 1)) diff --git a/distributed/tests/test_tls_functional.py b/distributed/tests/test_tls_functional.py index 54a1202400e..afd7ad96e41 100644 --- a/distributed/tests/test_tls_functional.py +++ b/distributed/tests/test_tls_functional.py @@ -4,13 +4,12 @@ """ import asyncio -import pytest - from distributed import Client, Nanny, Queue, Scheduler, Worker, wait, worker_client from distributed.core import Status from distributed.metrics import time from distributed.utils_test import ( double, + gen_test, gen_tls_cluster, inc, slowadd, @@ -202,17 +201,17 @@ async def test_retire_workers(c, s, a, b): assert time() < start + 5 -@pytest.mark.asyncio -async def test_security_dict_input_no_security(cleanup): - async with Scheduler(security={}) as s: - async with Worker(s.address, security={}) as w: +@gen_test() +async def test_security_dict_input_no_security(): + async with Scheduler(dashboard_address=":0", security={}) as s: + async with Worker(s.address, security={}): async with Client(s.address, security={}, asynchronous=True) as c: result = await c.submit(inc, 1) assert result == 2 -@pytest.mark.asyncio -async def test_security_dict_input(cleanup): +@gen_test() +async def test_security_dict_input(): conf = tls_config() ca_file = conf["distributed"]["comm"]["tls"]["ca-file"] client = conf["distributed"]["comm"]["tls"]["client"]["cert"] @@ -221,6 +220,7 @@ async def test_security_dict_input(cleanup): async with Scheduler( host="localhost", + dashboard_address=":0", security={"tls_ca_file": ca_file, "tls_scheduler_cert": scheduler}, ) as s: assert s.address.startswith("tls://") diff --git a/distributed/tests/test_utils_perf.py b/distributed/tests/test_utils_perf.py index 82dfac5653e..5bf54daf178 100644 --- a/distributed/tests/test_utils_perf.py +++ b/distributed/tests/test_utils_perf.py @@ -81,6 +81,7 @@ def enable_gc_diagnosis_and_log(diag, level="INFO"): gc.enable() +@pytest.mark.slow def test_gc_diagnosis_cpu_time(): diag = GCDiagnosis(warn_over_frac=0.75) diag.N_SAMPLES = 3 # shorten tests diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index 3dca5766f85..1b02e6a4251 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -265,5 +265,7 @@ def test_tls_cluster(tls_client): @pytest.mark.asyncio async def test_tls_scheduler(security, cleanup): - async with Scheduler(security=security, host="localhost") as s: + async with Scheduler( + security=security, host="localhost", dashboard_address=":0" + ) as s: assert s.address.startswith("tls") diff --git a/distributed/tests/test_variable.py b/distributed/tests/test_variable.py index 46eb8604058..3319d92f9d5 100644 --- a/distributed/tests/test_variable.py +++ b/distributed/tests/test_variable.py @@ -191,7 +191,7 @@ async def test_timeout_get(c, s, a, b): @pytest.mark.slow -@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny) +@gen_cluster(client=True, nthreads=[("127.0.0.1", 2)] * 5, Worker=Nanny, timeout=60) async def test_race(c, s, *workers): NITERS = 50 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index ca2b5a70d13..96b65bcec25 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -30,7 +30,7 @@ ) from distributed.comm.registry import backends from distributed.comm.tcp import TCPBackend -from distributed.compatibility import LINUX, MACOS, WINDOWS +from distributed.compatibility import LINUX, WINDOWS from distributed.core import CommClosedError, Status, rpc from distributed.diagnostics.plugin import PipInstall from distributed.metrics import time @@ -52,11 +52,10 @@ from distributed.worker import Worker, error_message, logger, parse_memory_limit -@pytest.mark.asyncio -async def test_worker_nthreads(cleanup): - async with Scheduler() as s: - async with Worker(s.address) as w: - assert w.executor._max_workers == CPU_COUNT +@gen_cluster(nthreads=[]) +async def test_worker_nthreads(s): + async with Worker(s.address) as w: + assert w.executor._max_workers == CPU_COUNT @gen_cluster() @@ -68,15 +67,14 @@ async def test_str(s, a, b): assert str(a.executing_count) in repr(a) -@pytest.mark.asyncio -async def test_identity(cleanup): - async with Scheduler() as s: - async with Worker(s.address) as w: - ident = w.identity(None) - assert "Worker" in ident["type"] - assert ident["scheduler"] == s.address - assert isinstance(ident["nthreads"], int) - assert isinstance(ident["memory_limit"], Number) +@gen_cluster(nthreads=[]) +async def test_identity(s): + async with Worker(s.address) as w: + ident = w.identity(None) + assert "Worker" in ident["type"] + assert ident["scheduler"] == s.address + assert isinstance(ident["nthreads"], int) + assert isinstance(ident["memory_limit"], Number) @gen_cluster(client=True) @@ -280,34 +278,30 @@ async def test_broadcast(s, a, b): assert results == {a.address: b"pong", b.address: b"pong"} -@gen_test() -async def test_worker_with_port_zero(): - s = await Scheduler(port=8007) - w = await Worker(s.address) - assert isinstance(w.port, int) - assert w.port > 1024 - - await w.close() +@gen_cluster(nthreads=[]) +async def test_worker_with_port_zero(s): + async with Worker(s.address) as w: + assert isinstance(w.port, int) + assert w.port > 1024 -@pytest.mark.asyncio -async def test_worker_port_range(cleanup): - async with Scheduler() as s: - port = "9867:9868" - async with Worker(s.address, port=port) as w1: - assert w1.port == 9867 # Selects first port in range - async with Worker(s.address, port=port) as w2: - assert w2.port == 9868 # Selects next port in range - with pytest.raises( - ValueError, match="Could not start Worker" - ): # No more ports left - async with Worker(s.address, port=port): - pass +@gen_cluster(nthreads=[]) +async def test_worker_port_range(s): + port = "9867:9868" + async with Worker(s.address, port=port) as w1: + assert w1.port == 9867 # Selects first port in range + async with Worker(s.address, port=port) as w2: + assert w2.port == 9868 # Selects next port in range + with pytest.raises( + ValueError, match="Could not start Worker" + ): # No more ports left + async with Worker(s.address, port=port): + pass @pytest.mark.slow -@pytest.mark.asyncio -async def test_worker_waits_for_scheduler(cleanup): +@gen_test(timeout=60) +async def test_worker_waits_for_scheduler(): w = Worker("127.0.0.1:8724") try: await asyncio.wait_for(w, 3) @@ -436,11 +430,10 @@ async def test_gather_missing_workers_replicated(c, s, a, b, missing_first): assert a.data[x.key] == b.data[x.key] == "x" -@pytest.mark.asyncio -async def test_io_loop(cleanup): - async with Scheduler(port=0) as s: - async with Worker(s.address, loop=s.loop) as w: - assert w.io_loop is s.loop +@gen_cluster(nthreads=[]) +async def test_io_loop(s): + async with Worker(s.address, loop=s.loop) as w: + assert w.io_loop is s.loop @gen_cluster(client=True, nthreads=[]) @@ -545,21 +538,18 @@ async def test_close_on_disconnect(s, w): assert time() < start + 5 -@pytest.mark.asyncio -async def test_memory_limit_auto(): - async with Scheduler() as s: - async with Worker(s.address, nthreads=1) as a, Worker( - s.address, nthreads=2 - ) as b, Worker(s.address, nthreads=100) as c, Worker( - s.address, nthreads=200 - ) as d: - assert isinstance(a.memory_limit, Number) - assert isinstance(b.memory_limit, Number) +@gen_cluster(nthreads=[]) +async def test_memory_limit_auto(s): + async with Worker(s.address, nthreads=1) as a, Worker( + s.address, nthreads=2 + ) as b, Worker(s.address, nthreads=100) as c, Worker(s.address, nthreads=200) as d: + assert isinstance(a.memory_limit, Number) + assert isinstance(b.memory_limit, Number) - if CPU_COUNT > 1: - assert a.memory_limit < b.memory_limit + if CPU_COUNT > 1: + assert a.memory_limit < b.memory_limit - assert c.memory_limit == d.memory_limit + assert c.memory_limit == d.memory_limit @gen_cluster(client=True) @@ -797,19 +787,16 @@ async def test_hold_onto_dependents(c, s, a, b): await asyncio.sleep(0.1) +# Normally takes >2s but it has been observed to take >30s occasionally @pytest.mark.slow -@gen_cluster(nthreads=[]) -async def test_worker_death_timeout(s): - with dask.config.set({"distributed.comm.timeouts.connect": "1s"}): - await s.close() - w = Worker(s.address, death_timeout=1) - +@gen_test(timeout=120) +async def test_worker_death_timeout(): + w = Worker("tcp://127.0.0.1:12345", death_timeout=0.1) with pytest.raises(TimeoutError) as info: await w assert "Worker" in str(info.value) assert "timed out" in str(info.value) or "failed to start" in str(info.value) - assert w.status == Status.closed @@ -1062,11 +1049,9 @@ async def test_start_services(s): @gen_test() async def test_scheduler_file(): with tmpfile() as fn: - s = await Scheduler(scheduler_file=fn, port=8009) - w = await Worker(scheduler_file=fn) - assert set(s.workers) == {w.address} - await w.close() - s.stop() + async with Scheduler(scheduler_file=fn, dashboard_address=":0") as s: + async with Worker(scheduler_file=fn) as w: + assert set(s.workers) == {w.address} @gen_cluster(client=True) @@ -1224,18 +1209,17 @@ def f(x): assert all(f.key in b.data for f in futures) -@pytest.mark.asyncio -async def test_deque_handler(cleanup): +@gen_cluster(nthreads=[]) +async def test_deque_handler(s): from distributed.worker import logger - async with Scheduler() as s: - async with Worker(s.address) as w: - deque_handler = w._deque_handler - logger.info("foo456") - assert deque_handler.deque - msg = deque_handler.deque[-1] - assert "distributed.worker" in deque_handler.format(msg) - assert any(msg.msg == "foo456" for msg in deque_handler.deque) + async with Worker(s.address) as w: + deque_handler = w._deque_handler + logger.info("foo456") + assert deque_handler.deque + msg = deque_handler.deque[-1] + assert "distributed.worker" in deque_handler.format(msg) + assert any(msg.msg == "foo456" for msg in deque_handler.deque) @gen_cluster(nthreads=[], client=True) @@ -1491,7 +1475,7 @@ def test_resource_limit(monkeypatch): @pytest.mark.asyncio @pytest.mark.parametrize("Worker", [Worker, Nanny]) -async def test_interface_async(loop, Worker): +async def test_interface_async(cleanup, loop, Worker): from distributed.utils import get_ip_interface psutil = pytest.importorskip("psutil") @@ -1510,7 +1494,7 @@ async def test_interface_async(loop, Worker): "Available interfaces are: %s." % (if_names,) ) - async with Scheduler(interface=if_name) as s: + async with Scheduler(dashboard_address=":0", interface=if_name) as s: assert s.address.startswith("tcp://127.0.0.1") async with Worker(s.address, interface=if_name) as w: assert w.address.startswith("tcp://127.0.0.1") @@ -1523,10 +1507,10 @@ async def test_interface_async(loop, Worker): @pytest.mark.asyncio @pytest.mark.parametrize("Worker", [Worker, Nanny]) -async def test_protocol_from_scheduler_address(Worker): - ucp = pytest.importorskip("ucp") +async def test_protocol_from_scheduler_address(cleanup, Worker): + pytest.importorskip("ucp") - async with Scheduler(protocol="ucx") as s: + async with Scheduler(protocol="ucx", dashboard_address=":0") as s: assert s.address.startswith("ucx://") async with Worker(s.address) as w: assert w.address.startswith("ucx://") @@ -1547,8 +1531,8 @@ def get_address_host(self, loc): monkeypatch.setitem(backends, "foo", BadBackend()) with dask.config.set({"distributed.comm.default-scheme": "foo"}): - async with Scheduler(protocol="tcp") as s: - async with Worker(s.address) as w: + async with Scheduler(protocol="tcp", dashboard_address=":0") as s: + async with Worker(s.address): # Ensure that worker is able to properly start up # without BadBackend.get_address_host raising a ValueError pass @@ -1556,8 +1540,8 @@ def get_address_host(self, loc): @pytest.mark.asyncio @pytest.mark.parametrize("Worker", [Worker, Nanny]) -async def test_worker_listens_on_same_interface_by_default(Worker): - async with Scheduler(host="localhost") as s: +async def test_worker_listens_on_same_interface_by_default(cleanup, Worker): + async with Scheduler(host="localhost", dashboard_address=":0") as s: assert s.ip in {"127.0.0.1", "localhost"} async with Worker(s.address) as w: assert s.ip == w.ip @@ -1582,46 +1566,41 @@ async def test_close_gracefully(c, s, a, b): @pytest.mark.slow -@pytest.mark.asyncio -async def test_lifetime(cleanup): - async with Scheduler() as s: - async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: - async with Client(s.address, asynchronous=True) as c: - futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address]) - await asyncio.sleep(1.5) - assert b.status != Status.running - await b.finished() - - assert set(b.data) == set(a.data) # successfully moved data over +@gen_cluster(client=True, nthreads=[]) +async def test_lifetime(c, s): + async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: + futures = c.map(slowinc, range(200), delay=0.1, worker=[b.address]) + await asyncio.sleep(1.5) + assert b.status != Status.running + await b.finished() + assert set(b.data) == set(a.data) # successfully moved data over -@gen_cluster(client=True, worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) -async def test_lifetime_stagger(c, s, a, b): +@gen_cluster(worker_kwargs={"lifetime": "10s", "lifetime_stagger": "2s"}) +async def test_lifetime_stagger(s, a, b): assert a.lifetime != b.lifetime assert 8 <= a.lifetime <= 12 assert 8 <= b.lifetime <= 12 -@pytest.mark.asyncio -async def test_bad_metrics(cleanup): +@gen_cluster(nthreads=[]) +async def test_bad_metrics(s): def bad_metric(w): raise Exception("Hello") - async with Scheduler() as s: - async with Worker(s.address, metrics={"bad": bad_metric}) as w: - assert "bad" not in s.workers[w.address].metrics + async with Worker(s.address, metrics={"bad": bad_metric}) as w: + assert "bad" not in s.workers[w.address].metrics -@pytest.mark.asyncio -async def test_bad_startup(cleanup): +@gen_cluster(nthreads=[]) +async def test_bad_startup(s): def bad_startup(w): raise Exception("Hello") - async with Scheduler() as s: - try: - w = await Worker(s.address, startup_information={"bad": bad_startup}) - except Exception: - pytest.fail("Startup exception was raised") + try: + await Worker(s.address, startup_information={"bad": bad_startup}) + except Exception: + pytest.fail("Startup exception was raised") @gen_cluster(client=True) @@ -1671,97 +1650,87 @@ async def test_pip_install_fails(c, s, a, b): # assert args[1:] == ["-m", "pip", "--upgrade", "install", "requests"] -@pytest.mark.asyncio -async def test_update_latency(cleanup): - async with await Scheduler() as s: - async with await Worker(s.address) as w: - original = w.latency - await w.heartbeat() - assert original != w.latency +@gen_cluster(nthreads=[]) +async def test_update_latency(s): + async with await Worker(s.address) as w: + original = w.latency + await w.heartbeat() + assert original != w.latency - if w.digests is not None: - assert w.digests["latency"].size() > 0 + if w.digests is not None: + assert w.digests["latency"].size() > 0 -@pytest.mark.skipif(MACOS, reason="frequently hangs") -@pytest.mark.asyncio -async def test_workerstate_executing(cleanup): - async with await Scheduler() as s: - async with await Worker(s.address) as w: - async with Client(s.address, asynchronous=True) as c: - ws = s.workers[w.address] - # Initially there are no active tasks - assert not ws.executing - # Submit a task and ensure the WorkerState is updated with the task - # it's executing - f = c.submit(slowinc, 1, delay=1) - while not ws.executing: - await asyncio.sleep(0.01) - assert s.tasks[f.key] in ws.executing - await f +@pytest.mark.slow +@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) +async def test_workerstate_executing(c, s, a): + ws = s.workers[a.address] + # Initially there are no active tasks + assert not ws.executing + # Submit a task and ensure the WorkerState is updated with the task + # it's executing + f = c.submit(slowinc, 1, delay=3) + while not ws.executing: + assert f.status == "pending" + await asyncio.sleep(0.01) + assert s.tasks[f.key] in ws.executing + await f -@pytest.mark.asyncio @pytest.mark.parametrize("reconnect", [True, False]) -async def test_heartbeat_comm_closed(cleanup, monkeypatch, reconnect): +@gen_cluster(nthreads=[]) +async def test_heartbeat_comm_closed(s, monkeypatch, reconnect): with captured_logger("distributed.worker", level=logging.WARNING) as logger: - async with await Scheduler() as s: - def bad_heartbeat_worker(*args, **kwargs): - raise CommClosedError() + def bad_heartbeat_worker(*args, **kwargs): + raise CommClosedError() - async with await Worker(s.address, reconnect=reconnect) as w: - # Trigger CommClosedError during worker heartbeat - monkeypatch.setattr( - w.scheduler, "heartbeat_worker", bad_heartbeat_worker - ) + async with await Worker(s.address, reconnect=reconnect) as w: + # Trigger CommClosedError during worker heartbeat + monkeypatch.setattr(w.scheduler, "heartbeat_worker", bad_heartbeat_worker) - await w.heartbeat() - if reconnect: - assert w.status == Status.running - else: - assert w.status == Status.closed + await w.heartbeat() + if reconnect: + assert w.status == Status.running + else: + assert w.status == Status.closed assert "Heartbeat to scheduler failed" in logger.getvalue() -@pytest.mark.asyncio -async def test_bad_local_directory(cleanup): - async with await Scheduler() as s: - try: - async with Worker(s.address, local_directory="/not/a/valid-directory"): - pass - except OSError: - # On Linux: [Errno 13] Permission denied: '/not' - # On MacOSX: [Errno 30] Read-only file system: '/not' +@gen_cluster(nthreads=[]) +async def test_bad_local_directory(s): + try: + async with Worker(s.address, local_directory="/not/a/valid-directory"): pass - else: - assert WINDOWS - - assert not any("error" in log for log in s.get_logs()) + except OSError: + # On Linux: [Errno 13] Permission denied: '/not' + # On MacOSX: [Errno 30] Read-only file system: '/not' + pass + else: + assert WINDOWS + assert not any("error" in log for log in s.get_logs()) -@pytest.mark.asyncio -async def test_taskstate_metadata(cleanup): - async with await Scheduler() as s: - async with await Worker(s.address) as w: - async with Client(s.address, asynchronous=True) as c: - await c.register_worker_plugin(TaskStateMetadataPlugin()) +@gen_cluster(client=True, nthreads=[]) +async def test_taskstate_metadata(c, s): + async with await Worker(s.address) as w: + await c.register_worker_plugin(TaskStateMetadataPlugin()) - f = c.submit(inc, 1) - await f + f = c.submit(inc, 1) + await f - ts = w.tasks[f.key] - assert "start_time" in ts.metadata - assert "stop_time" in ts.metadata - assert ts.metadata["stop_time"] > ts.metadata["start_time"] + ts = w.tasks[f.key] + assert "start_time" in ts.metadata + assert "stop_time" in ts.metadata + assert ts.metadata["stop_time"] > ts.metadata["start_time"] - # Check that Scheduler TaskState.metadata was also updated - assert s.tasks[f.key].metadata == ts.metadata + # Check that Scheduler TaskState.metadata was also updated + assert s.tasks[f.key].metadata == ts.metadata -@pytest.mark.asyncio -async def test_executor_offload(cleanup, monkeypatch): +@gen_cluster(client=True, nthreads=[]) +async def test_executor_offload(c, s, monkeypatch): class SameThreadClass: def __getstate__(self): return () @@ -1772,19 +1741,17 @@ def __setstate__(self, state): monkeypatch.setattr("distributed.worker.OFFLOAD_THRESHOLD", 1) - async with Scheduler() as s: - async with Worker(s.address, executor="offload") as w: - from distributed.utils import _offload_executor + async with Worker(s.address, executor="offload") as w: + from distributed.utils import _offload_executor - assert w.executor is _offload_executor + assert w.executor is _offload_executor - async with Client(s.address, asynchronous=True) as c: - x = SameThreadClass() + x = SameThreadClass() - def f(x): - return threading.get_ident() == x._thread_ident + def f(x): + return threading.get_ident() == x._thread_ident - assert await c.submit(f, x) + assert await c.submit(f, x) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) @@ -2001,28 +1968,24 @@ def get_worker_client_id(): default_client() -@pytest.mark.asyncio -async def test_multiple_executors(cleanup): +@gen_cluster(client=True, nthreads=[]) +async def test_multiple_executors(c, s): def get_thread_name(): return threading.current_thread().name - async with Scheduler() as s: - async with Worker( - s.address, - nthreads=2, - executor={ - "GPU": ThreadPoolExecutor(1, thread_name_prefix="Dask-GPU-Threads") - }, - ) as w: - async with Client(s.address, asynchronous=True) as c: - futures = [] - with dask.annotate(executor="default"): - futures.append(c.submit(get_thread_name, pure=False)) - with dask.annotate(executor="GPU"): - futures.append(c.submit(get_thread_name, pure=False)) - default_result, gpu_result = await c.gather(futures) - assert "Dask-Default-Threads" in default_result - assert "Dask-GPU-Threads" in gpu_result + async with Worker( + s.address, + nthreads=2, + executor={"GPU": ThreadPoolExecutor(1, thread_name_prefix="Dask-GPU-Threads")}, + ): + futures = [] + with dask.annotate(executor="default"): + futures.append(c.submit(get_thread_name, pure=False)) + with dask.annotate(executor="GPU"): + futures.append(c.submit(get_thread_name, pure=False)) + default_result, gpu_result = await c.gather(futures) + assert "Dask-Default-Threads" in default_result + assert "Dask-GPU-Threads" in gpu_result @gen_cluster(client=True) diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index 5fcc01877ef..64b5da6d037 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -201,7 +201,7 @@ def f(x): b2 = b.map(f) with Client( - loop=loop, processes=False, set_as_default=True, dashboard_address=None + loop=loop, processes=False, set_as_default=True, dashboard_address=":0" ) as c: assert dask.base.get_scheduler() == c.get for i in range(2): diff --git a/distributed/utils_test.py b/distributed/utils_test.py index cc21c7db753..bc02239188b 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -821,10 +821,10 @@ async def start_cluster( comm.comm is None for comm in s.stream_comms.values() ): await asyncio.sleep(0.01) - if time() - start > 5: + if time() > start + 30: await asyncio.gather(*[w.close(timeout=1) for w in workers]) await s.close(fast=True) - raise Exception("Cluster creation timeout") + raise TimeoutError("Cluster creation timeout") return s, workers @@ -885,6 +885,9 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture warnings.warn("ncores= has moved to nthreads=", stacklevel=2) nthreads = ncores + scheduler_kwargs = merge( + {"dashboard": False, "dashboard_address": ":0"}, scheduler_kwargs + ) worker_kwargs = merge( {"memory_limit": system.MEMORY_LIMIT, "death_timeout": 15}, worker_kwargs ) @@ -1030,7 +1033,7 @@ def terminate_process(proc): else: proc.send_signal(signal.SIGINT) try: - proc.wait(10) + proc.wait(30) finally: # Make sure we don't leave the process lingering around with suppress(OSError):