From c03c81e1c439862ea4bfb1fa2a0dd254b9457e06 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Tue, 8 Oct 2024 12:09:27 -0700 Subject: [PATCH] [serve] Stop scheduling task early when requests have been cancelled (#47847) In `fulfill_pending_requests`, there are two nested loops: - the outer loop greedily fulfills more requests so that if backoff doesn't occur, it's not necessary for new asyncio tasks to be started to fulfill each request - the inner loop handles backoff if replicas can't be found to fulfill the next request The outer loop will be stopped if there are enough tasks to handle all pending requests. However if all replicas are at max capacity, it's possible for the inner loop to continue to loop even when the task is no longer needed (e.g. when a request has been cancelled), because the inner loop simply continues to try to find an available replica without checking if the current task is even necessary. This PR makes sure that at the end of each iteration of the inner loop, it clears out requests in `pending_requests_to_fulfill` that have been cancelled, and then breaks out of the loop if there are enough tasks to handle the remaining requests. Tests: - Added a test that tests for the scenario where a request is cancelled while it's trying to find an available replica - Also modified the tests in `test_pow_2_scheduler.py` so that the backoff sequence is small values (1ms), and the timeouts in the tests are also low `10ms`, so that the unit tests run much faster (~5s now compared to ~30s before). ## Related issue number related: https://github.com/ray-project/ray/issues/47585 --------- Signed-off-by: Cindy Zhang Signed-off-by: ujjawal-khare --- .../ray/serve/tests/unit/test_pow_2_replica_scheduler.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py index 237a6c046a007..e078885486c51 100644 --- a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py +++ b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py @@ -1253,7 +1253,7 @@ async def test_multiple_queries_with_different_model_ids(self, pow_2_scheduler): ), ] - done, _ = await asyncio.wait(tasks, timeout=0.1) + done, _ = await asyncio.wait(tasks, timeout=0.01) assert len(done) == len(tasks) assert all( @@ -1600,7 +1600,7 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler): s.replica_queue_len_cache.update(r1.replica_id, DEFAULT_MAX_ONGOING_REQUESTS) task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 0 # 1 probe from scheduling requests # + 1 probe from when the replica set was updated with replica r1 @@ -1608,7 +1608,7 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler): # Now let the replica respond and accept the request, it should be scheduled. r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS - 1) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 1 assert (await task) == r1 @@ -1636,7 +1636,7 @@ async def test_queue_len_cache_background_probing(pow_2_scheduler): s.replica_queue_len_cache.update(r1.replica_id, 0) task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) - done, _ = await asyncio.wait([task], timeout=0.1) + done, _ = await asyncio.wait([task], timeout=0.01) assert len(done) == 1 assert (await task) == r1 # 0 probes from scheduling requests