From 9a57b30c40bef164db8dbf11ba9500aae1d16353 Mon Sep 17 00:00:00 2001 From: s5u13b Date: Wed, 15 Jan 2025 09:39:21 +0000 Subject: [PATCH] Refine waiting pending instance --- llumnix/manager.py | 14 ++++++++++---- tests/unit_test/global_scheduler/test_manager.py | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/llumnix/manager.py b/llumnix/manager.py index 3fb18ff4..74dcfd06 100644 --- a/llumnix/manager.py +++ b/llumnix/manager.py @@ -614,10 +614,16 @@ async def _check_deployment_states_loop(self, interval: float) -> None: async def watch_instance_deployment_states(instance_id: str): # There might be some delays of calling _init_server_and_instance, so sleep first. await asyncio.sleep(WATCH_DEPLOYMENT_INTERVAL) - instance_state = list_actors(filters=[("name", "=", get_instance_name(instance_id))]) - instance_pending_creation = len(instance_state) == 1 and instance_state[0]["state"] == "PENDING_CREATION" - if instance_pending_creation: - await asyncio.sleep(WATCH_DEPLOYMENT_INTERVAL_PENDING_INSTANCE) + wait_pending_instance_time = 0.0 + while True: + instance_state = list_actors(filters=[("name", "=", get_instance_name(instance_id))]) + instance_pending_creation = len(instance_state) == 1 and instance_state[0]["state"] == "PENDING_CREATION" + if not instance_pending_creation: + break + await asyncio.sleep(WATCH_DEPLOYMENT_INTERVAL) + wait_pending_instance_time += WATCH_DEPLOYMENT_INTERVAL + if wait_pending_instance_time >= WATCH_DEPLOYMENT_INTERVAL_PENDING_INSTANCE: + break pg_created, server_alive, instance_alive = self._get_instance_deployment_states(instance_id) if pg_created and (not server_alive or not instance_alive): logger.warning("instance {} deployment states incorrect, states: (pg {}, server {}, instance {})" diff --git a/tests/unit_test/global_scheduler/test_manager.py b/tests/unit_test/global_scheduler/test_manager.py index c96299f3..9cbbe2df 100644 --- a/tests/unit_test/global_scheduler/test_manager.py +++ b/tests/unit_test/global_scheduler/test_manager.py @@ -388,7 +388,7 @@ def test_check_deployment_states_loop_and_auto_scale_up_loop(ray_env, request_ou kill_server(instance_ids[1]) kill_instance(instance_ids[2]) # Wait for check deployment states, scale down instance and auto scale up. - time.sleep(120.0) + time.sleep(90.0) num_instances = ray.get(manager.scale_up.remote([], [])) assert num_instances == 4 curr_pgs, curr_servers, curr_instances = ray.get(manager._get_cluster_deployment.remote())