From 98eda4f3146ddbeda7c6ad0a1251e67563c347e0 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Thu, 22 Jun 2023 08:53:13 -0700 Subject: [PATCH] [serve] Don't change deployment status when autoscaling (#36520) The deployment state UPDATING should only be used during redeployment. Right now the state is updating during autoscaling, which can be confusing for users. This PR makes it so that the state doesn't change during autoscaling. This usually means that a deployment's status will remain HEALTHY while it's autoscaling. Signed-off-by: e428265 --- python/ray/_private/test_utils.py | 10 +- python/ray/serve/_private/deployment_state.py | 26 +- .../serve/tests/test_autoscaling_policy.py | 396 +++++++++--------- .../tests/test_config_files/get_signal.py | 12 + 4 files changed, 248 insertions(+), 196 deletions(-) create mode 100644 python/ray/serve/tests/test_config_files/get_signal.py diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 916e7d0de8ec6..dbda58bd8844e 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -509,7 +509,11 @@ def kill_actor_and_wait_for_failure(actor, timeout=10, retry_interval_ms=100): def wait_for_condition( - condition_predictor, timeout=10, retry_interval_ms=100, **kwargs: Any + condition_predictor, + timeout=10, + retry_interval_ms=100, + raise_exceptions=False, + **kwargs: Any, ): """Wait until a condition is met or time out with an exception. @@ -517,6 +521,8 @@ def wait_for_condition( condition_predictor: A function that predicts the condition. timeout: Maximum timeout in seconds. retry_interval_ms: Retry interval in milliseconds. + raise_exceptions: If true, exceptions that occur while executing + condition_predictor won't be caught and instead will be raised. Raises: RuntimeError: If the condition is not met before the timeout expires. @@ -528,6 +534,8 @@ def wait_for_condition( if condition_predictor(**kwargs): return except Exception: + if raise_exceptions: + raise last_ex = ray._private.utils.format_error_message(traceback.format_exc()) time.sleep(retry_interval_ms / 1000.0) message = "The condition wasn't met before the timeout expired." diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 7f9807898fd5d..20195d96fb98a 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1286,6 +1286,24 @@ def _set_target_state(self, target_info: DeploymentInfo) -> None: logger.info(f"Deploying new version of deployment {self._name}.") + def _set_target_state_autoscaling(self, num_replicas: int) -> None: + """Update the target number of replicas based on an autoscaling decision. + + This differs from _set_target_state because we are updating the + target number of replicas base on an autoscaling decision and + not a redeployment. This only changes the target num_replicas, + and doesn't change the current deployment status. + """ + + new_info = copy(self._target_state.info) + new_info.set_autoscaled_num_replicas(num_replicas) + new_info.version = self._target_state.version.code_version + + target_state = DeploymentTargetState.from_deployment_info(new_info) + + self._save_checkpoint_func(writeahead_checkpoints={self._name: target_state}) + self._target_state = target_state + def deploy(self, deployment_info: DeploymentInfo) -> bool: """Deploy the deployment. @@ -1345,7 +1363,6 @@ def autoscale( if self._target_state.deleting: return - curr_info = self._target_state.info autoscaling_policy = self._target_state.info.autoscaling_policy decision_num_replicas = autoscaling_policy.get_decision_num_replicas( curr_target_num_replicas=self._target_state.num_replicas, @@ -1362,12 +1379,7 @@ def autoscale( f"current handle queued queries: {current_handle_queued_queries}." ) - new_config = copy(curr_info) - new_config.set_autoscaled_num_replicas(decision_num_replicas) - if new_config.version is None: - new_config.version = self._target_state.version.code_version - - self._set_target_state(new_config) + self._set_target_state_autoscaling(decision_num_replicas) def delete(self) -> None: if not self._target_state.deleting: diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index efe330e8769d2..5c764630f34d6 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -14,8 +14,15 @@ BasicAutoscalingPolicy, calculate_desired_num_replicas, ) -from ray.serve._private.common import DeploymentInfo -from ray.serve._private.common import ReplicaState +from ray.serve._private.common import ( + DeploymentStatus, + DeploymentStatusInfo, + ReplicaState, + DeploymentInfo, +) +from ray.serve.generated.serve_pb2 import ( + DeploymentStatusInfo as DeploymentStatusInfoProto, +) from ray.serve.config import AutoscalingConfig from ray.serve._private.constants import ( CONTROL_LOOP_PERIOD_S, @@ -23,9 +30,8 @@ DEPLOYMENT_NAME_PREFIX_SEPARATOR, ) from ray.serve.controller import ServeController -from ray.serve.deployment import Deployment +from ray.serve.schema import ServeDeploySchema import ray.util.state as state_api -from ray.dashboard.modules.serve.sdk import ServeSubmissionClient import ray from ray import serve @@ -112,39 +118,35 @@ def test_smoothing_factor(self): assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25 -def get_running_replicas( - controller: ServeController, deployment: Deployment, app_name -) -> List: +def get_deployment_status(controller, name) -> DeploymentStatus: + ref = ray.get(controller.get_deployment_status.remote(f"default_{name}")) + info = DeploymentStatusInfo.from_proto(DeploymentStatusInfoProto.FromString(ref)) + return info.status + + +def get_running_replicas(controller: ServeController, name: str) -> List: """Get the replicas currently running for given deployment""" - if app_name: - deployment_name = app_name + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment.name - else: - deployment_name = deployment.name replicas = ray.get( - controller._dump_replica_states_for_testing.remote(deployment_name) + controller._dump_replica_states_for_testing.remote(f"default_{name}") ) running_replicas = replicas.get([ReplicaState.RUNNING]) return running_replicas -def get_running_replica_tags( - controller: ServeController, - deployment: Deployment, - app_name: str = SERVE_DEFAULT_APP_NAME, -) -> List: +def get_running_replica_tags(controller: ServeController, name: str) -> List: """Get the replica tags of running replicas for given deployment""" - running_replicas = get_running_replicas(controller, deployment, app_name) + running_replicas = get_running_replicas(controller, name) return [replica.replica_tag for replica in running_replicas] -def get_num_running_replicas( - controller: ServeController, - deployment: Deployment, - app_name: str = SERVE_DEFAULT_APP_NAME, -) -> int: - """Get the amount of replicas currently running for given deployment""" - running_replicas = get_running_replicas(controller, deployment, app_name) - return len(running_replicas) +def check_autoscale_num_replicas(controller: ServeController, name: str) -> int: + """Check the number of replicas currently running for given deployment. + + This should only be called if the deployment has already transitioned + to HEALTHY, and this function will check that it remains healthy. + """ + assert get_deployment_status(controller, name) == DeploymentStatus.HEALTHY + return len(get_running_replicas(controller, name)) def assert_no_replicas_deprovisioned( @@ -181,11 +183,7 @@ def test_assert_no_replicas_deprovisioned(): assert_no_replicas_deprovisioned(replica_tags_2, replica_tags_1) -def get_deployment_start_time( - controller: ServeController, - deployment: Deployment, - app_name: str = SERVE_DEFAULT_APP_NAME, -): +def get_deployment_start_time(controller: ServeController, name: str): """Return start time for given deployment""" deployment_route_list = DeploymentRouteList.FromString( ray.get(controller.list_deployments.remote()) @@ -197,11 +195,7 @@ def get_deployment_start_time( ) for deployment_route in deployment_route_list.deployment_routes } - if app_name: - deployment_name = app_name + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment.name - else: - deployment_name = deployment.name - deployment_info, _route_prefix = deployments[deployment_name] + deployment_info, _route_prefix = deployments[f"default_{name}"] return deployment_info.start_time_ms @@ -209,6 +203,7 @@ def get_deployment_start_time( def test_e2e_basic_scale_up_down(min_replicas, serve_instance): """Send 100 requests and check that we autoscale up, and then back down.""" + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -231,29 +226,35 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(100)] # scale up one more replica from min_replicas wait_for_condition( - lambda: get_num_running_replicas(controller, A) >= min_replicas + 1 + lambda: check_autoscale_num_replicas(controller, "A") >= min_replicas + 1, + raise_exceptions=True, ) signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= min_replicas) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= min_replicas, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_e2e_basic_scale_up_down_with_0_replica(serve_instance): """Send 100 requests and check that we autoscale up, and then back down.""" + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -276,21 +277,28 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(100)] # scale up one more replica from min_replicas - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 1) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 1, + raise_exceptions=True, + ) signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 0) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 0, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @mock.patch.object(ServeController, "run_control_loop") @@ -316,7 +324,7 @@ def __call__(self): serve.run(A.bind()) controller = serve_instance._controller - assert get_num_running_replicas(controller, A) == 2 + assert len(get_running_replicas(controller, "A")) == 2 def test_upscale_downscale_delay(): @@ -612,6 +620,7 @@ def test_e2e_bursty(serve_instance): Sends 100 requests in bursts. Uses delays for smooth provisioning. """ + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -634,14 +643,18 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(100)] - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2) - num_replicas = get_num_running_replicas(controller, A) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 2, + raise_exceptions=True, + ) + num_replicas = check_autoscale_num_replicas(controller, "A") signal.send.remote() # Execute a bursty workload that issues 100 requests every 0.05 seconds @@ -652,15 +665,18 @@ def __call__(self): # parameters. for _ in range(5): time.sleep(0.05) - assert get_num_running_replicas(controller, A) == num_replicas + assert check_autoscale_num_replicas(controller, "A") == num_replicas [handle.remote() for _ in range(100)] signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 1, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -669,6 +685,7 @@ def test_e2e_intermediate_downscaling(serve_instance): Scales up, then down, and up again. """ + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -691,31 +708,44 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(50)] wait_for_condition( - lambda: get_num_running_replicas(controller, A) >= 20, timeout=30 + lambda: check_autoscale_num_replicas(controller, "A") >= 20, + timeout=30, + raise_exceptions=True, ) signal.send.remote() - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1, timeout=30) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 1, + timeout=30, + raise_exceptions=True, + ) signal.send.remote(clear=True) [handle.remote() for _ in range(50)] wait_for_condition( - lambda: get_num_running_replicas(controller, A) >= 20, timeout=30 + lambda: check_autoscale_num_replicas(controller, "A") >= 20, + timeout=30, + raise_exceptions=True, ) signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1, timeout=30) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") < 1, + timeout=30, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -723,66 +753,64 @@ def __call__(self): def test_e2e_update_autoscaling_deployment(serve_instance): # See https://github.com/ray-project/ray/issues/21017 for details - signal = SignalActor.remote() + controller = serve_instance._controller + signal = SignalActor.options(name="signal123").remote() - @serve.deployment( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 0, - "max_replicas": 10, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - # We will send over a lot of queries. This will make sure replicas are - # killed quickly during cleanup. - graceful_shutdown_timeout_s=1, - max_concurrent_queries=1000, - version="v1", - ) - class A: - def __call__(self): - ray.get(signal.wait.remote()) + app_config = { + "import_path": "ray.serve.tests.test_config_files.get_signal.app", + "deployments": [ + { + "name": "A", + "autoscaling_config": { + "metrics_interval_s": 0.1, + "min_replicas": 0, + "max_replicas": 10, + "look_back_period_s": 0.2, + "downscale_delay_s": 0.2, + "upscale_delay_s": 0.2, + }, + "graceful_shutdown_timeout_s": 1, + "max_concurrent_queries": 1000, + } + ], + } - handle = serve.run(A.bind()) + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Deployed A with min_replicas 1 and max_replicas 10.") + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + handle = serve.get_deployment("A").get_handle() + start_time = get_deployment_start_time(controller, "A") - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) - - assert get_num_running_replicas(controller, A) == 0 - + assert check_autoscale_num_replicas(controller, "A") == 0 [handle.remote() for _ in range(400)] print("Issued 400 requests.") - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 10) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 10, + raise_exceptions=True, + ) print("Scaled to 10 replicas.") - first_deployment_replicas = get_running_replica_tags(controller, A) + first_deployment_replicas = get_running_replica_tags(controller, "A") - assert get_num_running_replicas(controller, A) < 20 + assert check_autoscale_num_replicas(controller, "A") < 20 [handle.remote() for _ in range(458)] time.sleep(3) print("Issued 458 requests. Request routing in-progress.") - serve.run( - A.options( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 2, - "max_replicas": 20, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - version="v1", - ).bind() - ) + app_config["deployments"][0]["autoscaling_config"]["min_replicas"] = 2 + app_config["deployments"][0]["autoscaling_config"]["max_replicas"] = 20 + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Redeployed A.") - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 20) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 20, + raise_exceptions=True, + ) print("Scaled up to 20 requests.") - second_deployment_replicas = get_running_replica_tags(controller, A) + second_deployment_replicas = get_running_replica_tags(controller, "A") # Confirm that none of the original replicas were de-provisioned assert_no_replicas_deprovisioned( @@ -792,104 +820,95 @@ def __call__(self): signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 2) - assert get_num_running_replicas(controller, A) > 1 + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 2, + raise_exceptions=True, + ) + assert check_autoscale_num_replicas(controller, "A") > 1 # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time # scale down to 0 - serve.run( - A.options( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 0, - "max_replicas": 20, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - version="v1", - ).bind() - ) + app_config["deployments"][0]["autoscaling_config"]["min_replicas"] = 0 + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Redeployed A.") + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) - wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1) - assert get_num_running_replicas(controller, A) == 0 + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") < 1, raise_exceptions=True + ) + assert check_autoscale_num_replicas(controller, "A") == 0 # scale up [handle.remote() for _ in range(400)] - wait_for_condition(lambda: get_num_running_replicas(controller, A) > 0) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") > 0, raise_exceptions=True + ) signal.send.remote() - wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") < 1, raise_exceptions=True + ) @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_e2e_raise_min_replicas(serve_instance): - signal = SignalActor.remote() + controller = serve_instance._controller + signal = SignalActor.options(name="signal123").remote() - @serve.deployment( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 0, - "max_replicas": 10, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - # We will send over a lot of queries. This will make sure replicas are - # killed quickly during cleanup. - graceful_shutdown_timeout_s=1, - max_concurrent_queries=1000, - version="v1", - ) - class A: - def __call__(self): - ray.get(signal.wait.remote()) + app_config = { + "import_path": "ray.serve.tests.test_config_files.get_signal.app", + "deployments": [ + { + "name": "A", + "autoscaling_config": { + "metrics_interval_s": 0.1, + "min_replicas": 0, + "max_replicas": 10, + "look_back_period_s": 0.2, + "downscale_delay_s": 0.2, + "upscale_delay_s": 0.2, + }, + "graceful_shutdown_timeout_s": 1, + "max_concurrent_queries": 1000, + } + ], + } - A.deploy() + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Deployed A.") + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A, app_name=None) - - assert get_num_running_replicas(controller, A, app_name=None) == 0 + assert check_autoscale_num_replicas(controller, "A") == 0 - handle = A.get_handle() + handle = serve.get_deployment("default_A").get_handle() [handle.remote() for _ in range(1)] print("Issued one request.") time.sleep(2) - assert get_num_running_replicas(controller, A, app_name=None) == 1 + assert check_autoscale_num_replicas(controller, "A") == 1 print("Scale up to 1 replica.") - first_deployment_replicas = get_running_replica_tags(controller, A, app_name=None) + first_deployment_replicas = get_running_replica_tags(controller, "A") - A.options( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 2, - "max_replicas": 10, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - graceful_shutdown_timeout_s=1, - max_concurrent_queries=1000, - version="v1", - ).deploy() + app_config["deployments"][0]["autoscaling_config"]["min_replicas"] = 2 + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Redeployed A with min_replicas set to 2.") - wait_for_condition( - lambda: get_num_running_replicas(controller, A, app_name=None) >= 2 + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY ) - time.sleep(5) # Confirm that autoscaler doesn't scale above 2 even after waiting - assert get_num_running_replicas(controller, A, app_name=None) == 2 + time.sleep(5) + assert check_autoscale_num_replicas(controller, "A") == 2 print("Autoscaled to 2 without issuing any new requests.") - second_deployment_replicas = get_running_replica_tags(controller, A, app_name=None) + second_deployment_replicas = get_running_replica_tags(controller, "A") # Confirm that none of the original replicas were de-provisioned assert_no_replicas_deprovisioned( @@ -902,13 +921,14 @@ def __call__(self): # As the queue is drained, we should scale back down. wait_for_condition( - lambda: get_num_running_replicas(controller, A, app_name=None) <= 2 + lambda: check_autoscale_num_replicas(controller, "A") <= 2, + raise_exceptions=True, ) - assert get_num_running_replicas(controller, A, app_name=None) > 1 + assert check_autoscale_num_replicas(controller, "A") > 1 print("Stayed at 2 replicas.") # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A, app_name=None) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -1067,6 +1087,7 @@ def check_num_replicas(live: int, dead: int): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_e2e_preserve_prev_replicas_rest_api(serve_instance): + client = serve_instance signal = SignalActor.options(name="signal", namespace="serve").remote() # Step 1: Prepare the script in a zip file so it can be submitted via REST API. @@ -1091,7 +1112,7 @@ def g(): ) # Step 2: Deploy it with max_replicas=1 - payload = { + app_config = { "import_path": "app:app", "runtime_env": {"working_dir": f"file://{tmp_path.name}"}, "deployments": [ @@ -1109,9 +1130,8 @@ def g(): ], } - client = ServeSubmissionClient("http://localhost:52365") - client.deploy_application(payload) - wait_for_condition(lambda: client.get_status()["app_status"]["status"] == "RUNNING") + client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) + wait_for_condition(lambda: client.get_serve_status().app_status.status == "RUNNING") # Step 3: Verify that it can scale from 0 to 1. @ray.remote @@ -1140,9 +1160,9 @@ def check_num_replicas(num: int): existing_pid = ray.get(ref) # Step 4: Change the max replicas to 2 - payload["deployments"][0]["autoscaling_config"]["max_replicas"] = 2 - client.deploy_application(payload) - wait_for_condition(lambda: client.get_status()["app_status"]["status"] == "RUNNING") + app_config["deployments"][0]["autoscaling_config"]["max_replicas"] = 2 + client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) + wait_for_condition(lambda: client.get_serve_status().app_status.status == "RUNNING") wait_for_condition(check_num_replicas, retry_interval_ms=1000, timeout=20, num=1) # Step 5: Make sure it is the same replica (lightweight change). @@ -1151,11 +1171,11 @@ def check_num_replicas(num: int): assert other_pid == existing_pid # Step 6: Make sure initial_replicas overrides previous replicas - payload["deployments"][0]["autoscaling_config"]["max_replicas"] = 5 - payload["deployments"][0]["autoscaling_config"]["initial_replicas"] = 3 - payload["deployments"][0]["autoscaling_config"]["upscale_delay"] = 600 - client.deploy_application(payload) - wait_for_condition(lambda: client.get_status()["app_status"]["status"] == "RUNNING") + app_config["deployments"][0]["autoscaling_config"]["max_replicas"] = 5 + app_config["deployments"][0]["autoscaling_config"]["initial_replicas"] = 3 + app_config["deployments"][0]["autoscaling_config"]["upscale_delay"] = 600 + client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) + wait_for_condition(lambda: client.get_serve_status().app_status.status == "RUNNING") wait_for_condition(check_num_replicas, retry_interval_ms=1000, timeout=20, num=3) # Step 7: Make sure original replica is still running (lightweight change) diff --git a/python/ray/serve/tests/test_config_files/get_signal.py b/python/ray/serve/tests/test_config_files/get_signal.py new file mode 100644 index 0000000000000..7ed9c2f4b9bb9 --- /dev/null +++ b/python/ray/serve/tests/test_config_files/get_signal.py @@ -0,0 +1,12 @@ +import ray +from ray import serve + + +@serve.deployment +class A: + def __call__(self): + signal = ray.get_actor("signal123") + ray.get(signal.wait.remote()) + + +app = A.bind()