Skip to content

Commit

Permalink
Increase default work-stealing interval by 10x (#8997)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Jan 30, 2025
1 parent fd3722d commit 5589049
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 22 deletions.
4 changes: 3 additions & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ async def test_counters(c, s, a, b):
await asyncio.sleep(0.01)


@gen_cluster(client=True)
@gen_cluster(
client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}
)
async def test_stealing_events(c, s, a, b):
se = StealingEvents(s)

Expand Down
2 changes: 1 addition & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ distributed:
idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes"
no-workers-timeout: null # If a task remains unrunnable for longer than this, it fails.
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
work-stealing-interval: 1s # Callback time for work stealing
worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers
rootish-taskgroup: 5 # number of dependencies of a rootish tg
rootish-taskgroup-dependencies: 5 # number of dependencies of the dependencies of the rootish tg
Expand Down
8 changes: 6 additions & 2 deletions distributed/http/scheduler/tests/test_stealing_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ async def test_prometheus(c, s, a, b):
assert active_metrics == expected_metrics


@gen_cluster(client=True)
@gen_cluster(
client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}
)
async def test_prometheus_collect_count_total_by_cost_multipliers(c, s, a, b):
pytest.importorskip("prometheus_client")

Expand Down Expand Up @@ -58,7 +60,9 @@ async def fetch_metrics_by_cost_multipliers():
assert count == expected_count


@gen_cluster(client=True)
@gen_cluster(
client=True, config={"distributed.scheduler.work-stealing-interval": "100ms"}
)
async def test_prometheus_collect_cost_total_by_cost_multipliers(c, s, a, b):
pytest.importorskip("prometheus_client")

Expand Down
98 changes: 80 additions & 18 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ async def test_work_stealing(c, s, a, b):
assert len(b.data) > 10


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_expensive_data_fast_computation(c, s, a, b):
np = pytest.importorskip("numpy")

Expand All @@ -91,7 +95,11 @@ async def test_dont_steal_expensive_data_fast_computation(c, s, a, b):
assert len(a.data) == 12


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_cheap_data_slow_computation(c, s, a, b):
x = c.submit(slowinc, 100, delay=0.1) # learn that slowinc is slow
await wait(x)
Expand All @@ -104,7 +112,11 @@ async def test_steal_cheap_data_slow_computation(c, s, a, b):


@pytest.mark.slow
@gen_cluster(client=True, nthreads=[("", 1)] * 2, config=NO_AMM)
@gen_cluster(
client=True,
nthreads=[("", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM},
)
async def test_steal_expensive_data_slow_computation(c, s, a, b):
np = pytest.importorskip("numpy")

Expand All @@ -121,7 +133,11 @@ async def test_steal_expensive_data_slow_computation(c, s, a, b):
assert b.data # not empty


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10, config=NO_AMM)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 10,
config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM},
)
async def test_worksteal_many_thieves(c, s, *workers):
x = c.submit(slowinc, -1, delay=0.1)
await x
Expand Down Expand Up @@ -283,7 +299,11 @@ async def test_eventually_steal_unknown_functions(c, s, a, b):


@pytest.mark.skip(reason="")
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 3,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_related_tasks(e, s, a, b, c):
futures = e.map(
slowinc, range(20), delay=0.05, workers=a.address, allow_other_workers=True
Expand All @@ -299,7 +319,11 @@ async def test_steal_related_tasks(e, s, a, b, c):
assert nearby > 10


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 10)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 10,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_fast_tasks_compute_time(c, s, *workers):
def do_nothing(x, y=None):
pass
Expand All @@ -315,7 +339,11 @@ def do_nothing(x, y=None):
assert len(s.workers[workers[0].address].has_what) == len(xs) + len(futures)


@gen_cluster(client=True, nthreads=[("", 1)])
@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_fast_tasks_blocklist(c, s, a):
async with BlockedGetData(s.address) as b:
# create a dependency
Expand Down Expand Up @@ -358,7 +386,11 @@ def fast_blocked(i, x):
assert ts.who_has == {ws_a}


@gen_cluster(client=True, nthreads=[("", 1)], config=NO_AMM)
@gen_cluster(
client=True,
nthreads=[("", 1)],
config={"distributed.scheduler.work-stealing-interval": "100ms", **NO_AMM},
)
async def test_new_worker_steals(c, s, a):
await wait(c.submit(slowinc, 1, delay=0.01))

Expand All @@ -380,7 +412,10 @@ async def test_new_worker_steals(c, s, a):
assert b.data


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_work_steal_allow_other_workers(c, s, a, b):
# Note: this test also verifies the baseline for all other tests below
futures = c.map(
Expand All @@ -397,7 +432,10 @@ async def test_work_steal_allow_other_workers(c, s, a, b):
assert result == sum(map(inc, range(100)))


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2)])
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1), ("127.0.0.1", 2)],
)
async def test_dont_steal_worker_restrictions(c, s, a, b):
futures = c.map(slowinc, range(100), delay=0.05, workers=a.address)

Expand Down Expand Up @@ -504,7 +542,11 @@ async def test_steal_resource_restrictions(c, s, a):
assert 20 < len(a.state.tasks) < 80


@gen_cluster(client=True, nthreads=[("", 1, {"resources": {"A": 2, "C": 1}})])
@gen_cluster(
client=True,
nthreads=[("", 1, {"resources": {"A": 2, "C": 1}})],
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_resource_restrictions_asym_diff(c, s, a):
# See https://github.com/dask/distributed/issues/5565
future = c.submit(slowinc, 1, delay=0.1, workers=a.address)
Expand Down Expand Up @@ -541,7 +583,11 @@ def slow(x):
assert max(durations) / min(durations) < 3


@gen_cluster(client=True, nthreads=[("127.0.0.1", 4)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 4)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_executing_tasks(c, s, a, b):
futures = c.map(
slowinc, range(4), delay=0.1, workers=a.address, allow_other_workers=True
Expand Down Expand Up @@ -797,7 +843,10 @@ async def test_restart(c, s, a, b):
assert not any(x for L in steal.stealable.values() for x in L)


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_steal_twice(c, s, a, b):
x = c.submit(inc, 1, workers=a.address)
await wait(x)
Expand Down Expand Up @@ -841,7 +890,10 @@ async def test_steal_twice(c, s, a, b):
@gen_cluster(
client=True,
nthreads=[("", 1)] * 3,
config={"distributed.worker.memory.pause": False},
config={
"distributed.worker.memory.pause": False,
"distributed.scheduler.work-stealing-interval": "100ms",
},
)
async def test_paused_workers_must_not_steal(c, s, w1, w2, w3):
w2.status = Status.paused
Expand All @@ -859,7 +911,10 @@ async def test_paused_workers_must_not_steal(c, s, w1, w2, w3):
assert w3.data


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_already_released(c, s, a, b):
future = c.submit(slowinc, 1, delay=0.05, workers=a.address)
key = future.key
Expand All @@ -886,7 +941,11 @@ async def test_dont_steal_already_released(c, s, a, b):
await asyncio.sleep(0.05)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 2,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_dont_steal_long_running_tasks(c, s, a, b):
def long(delay):
with worker_client() as c:
Expand Down Expand Up @@ -974,7 +1033,7 @@ async def test_lose_task(c, s, a, b):
assert "Error" not in out


@pytest.mark.parametrize("interval, expected", [(None, 100), ("500ms", 500), (2, 2)])
@pytest.mark.parametrize("interval, expected", [(None, 1000), ("500ms", 500), (2, 2)])
@gen_cluster(nthreads=[], config={"distributed.scheduler.work-stealing": False})
async def test_parse_stealing_interval(s, interval, expected):
from distributed.scheduler import WorkStealing
Expand All @@ -991,7 +1050,10 @@ async def test_parse_stealing_interval(s, interval, expected):
assert s.periodic_callbacks["stealing"].callback_time == expected


@gen_cluster(client=True)
@gen_cluster(
client=True,
config={"distributed.scheduler.work-stealing-interval": "100ms"},
)
async def test_balance_with_longer_task(c, s, a, b):
np = pytest.importorskip("numpy")

Expand Down

0 comments on commit 5589049

Please sign in to comment.