diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 916e7d0de8ec6..dbda58bd8844e 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -509,7 +509,11 @@ def kill_actor_and_wait_for_failure(actor, timeout=10, retry_interval_ms=100): def wait_for_condition( - condition_predictor, timeout=10, retry_interval_ms=100, **kwargs: Any + condition_predictor, + timeout=10, + retry_interval_ms=100, + raise_exceptions=False, + **kwargs: Any, ): """Wait until a condition is met or time out with an exception. @@ -517,6 +521,8 @@ def wait_for_condition( condition_predictor: A function that predicts the condition. timeout: Maximum timeout in seconds. retry_interval_ms: Retry interval in milliseconds. + raise_exceptions: If true, exceptions that occur while executing + condition_predictor won't be caught and instead will be raised. Raises: RuntimeError: If the condition is not met before the timeout expires. @@ -528,6 +534,8 @@ def wait_for_condition( if condition_predictor(**kwargs): return except Exception: + if raise_exceptions: + raise last_ex = ray._private.utils.format_error_message(traceback.format_exc()) time.sleep(retry_interval_ms / 1000.0) message = "The condition wasn't met before the timeout expired." diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 7f9807898fd5d..20195d96fb98a 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1286,6 +1286,24 @@ def _set_target_state(self, target_info: DeploymentInfo) -> None: logger.info(f"Deploying new version of deployment {self._name}.") + def _set_target_state_autoscaling(self, num_replicas: int) -> None: + """Update the target number of replicas based on an autoscaling decision. + + This differs from _set_target_state because we are updating the + target number of replicas base on an autoscaling decision and + not a redeployment. This only changes the target num_replicas, + and doesn't change the current deployment status. + """ + + new_info = copy(self._target_state.info) + new_info.set_autoscaled_num_replicas(num_replicas) + new_info.version = self._target_state.version.code_version + + target_state = DeploymentTargetState.from_deployment_info(new_info) + + self._save_checkpoint_func(writeahead_checkpoints={self._name: target_state}) + self._target_state = target_state + def deploy(self, deployment_info: DeploymentInfo) -> bool: """Deploy the deployment. @@ -1345,7 +1363,6 @@ def autoscale( if self._target_state.deleting: return - curr_info = self._target_state.info autoscaling_policy = self._target_state.info.autoscaling_policy decision_num_replicas = autoscaling_policy.get_decision_num_replicas( curr_target_num_replicas=self._target_state.num_replicas, @@ -1362,12 +1379,7 @@ def autoscale( f"current handle queued queries: {current_handle_queued_queries}." ) - new_config = copy(curr_info) - new_config.set_autoscaled_num_replicas(decision_num_replicas) - if new_config.version is None: - new_config.version = self._target_state.version.code_version - - self._set_target_state(new_config) + self._set_target_state_autoscaling(decision_num_replicas) def delete(self) -> None: if not self._target_state.deleting: diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index efe330e8769d2..5c764630f34d6 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -14,8 +14,15 @@ BasicAutoscalingPolicy, calculate_desired_num_replicas, ) -from ray.serve._private.common import DeploymentInfo -from ray.serve._private.common import ReplicaState +from ray.serve._private.common import ( + DeploymentStatus, + DeploymentStatusInfo, + ReplicaState, + DeploymentInfo, +) +from ray.serve.generated.serve_pb2 import ( + DeploymentStatusInfo as DeploymentStatusInfoProto, +) from ray.serve.config import AutoscalingConfig from ray.serve._private.constants import ( CONTROL_LOOP_PERIOD_S, @@ -23,9 +30,8 @@ DEPLOYMENT_NAME_PREFIX_SEPARATOR, ) from ray.serve.controller import ServeController -from ray.serve.deployment import Deployment +from ray.serve.schema import ServeDeploySchema import ray.util.state as state_api -from ray.dashboard.modules.serve.sdk import ServeSubmissionClient import ray from ray import serve @@ -112,39 +118,35 @@ def test_smoothing_factor(self): assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25 -def get_running_replicas( - controller: ServeController, deployment: Deployment, app_name -) -> List: +def get_deployment_status(controller, name) -> DeploymentStatus: + ref = ray.get(controller.get_deployment_status.remote(f"default_{name}")) + info = DeploymentStatusInfo.from_proto(DeploymentStatusInfoProto.FromString(ref)) + return info.status + + +def get_running_replicas(controller: ServeController, name: str) -> List: """Get the replicas currently running for given deployment""" - if app_name: - deployment_name = app_name + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment.name - else: - deployment_name = deployment.name replicas = ray.get( - controller._dump_replica_states_for_testing.remote(deployment_name) + controller._dump_replica_states_for_testing.remote(f"default_{name}") ) running_replicas = replicas.get([ReplicaState.RUNNING]) return running_replicas -def get_running_replica_tags( - controller: ServeController, - deployment: Deployment, - app_name: str = SERVE_DEFAULT_APP_NAME, -) -> List: +def get_running_replica_tags(controller: ServeController, name: str) -> List: """Get the replica tags of running replicas for given deployment""" - running_replicas = get_running_replicas(controller, deployment, app_name) + running_replicas = get_running_replicas(controller, name) return [replica.replica_tag for replica in running_replicas] -def get_num_running_replicas( - controller: ServeController, - deployment: Deployment, - app_name: str = SERVE_DEFAULT_APP_NAME, -) -> int: - """Get the amount of replicas currently running for given deployment""" - running_replicas = get_running_replicas(controller, deployment, app_name) - return len(running_replicas) +def check_autoscale_num_replicas(controller: ServeController, name: str) -> int: + """Check the number of replicas currently running for given deployment. + + This should only be called if the deployment has already transitioned + to HEALTHY, and this function will check that it remains healthy. + """ + assert get_deployment_status(controller, name) == DeploymentStatus.HEALTHY + return len(get_running_replicas(controller, name)) def assert_no_replicas_deprovisioned( @@ -181,11 +183,7 @@ def test_assert_no_replicas_deprovisioned(): assert_no_replicas_deprovisioned(replica_tags_2, replica_tags_1) -def get_deployment_start_time( - controller: ServeController, - deployment: Deployment, - app_name: str = SERVE_DEFAULT_APP_NAME, -): +def get_deployment_start_time(controller: ServeController, name: str): """Return start time for given deployment""" deployment_route_list = DeploymentRouteList.FromString( ray.get(controller.list_deployments.remote()) @@ -197,11 +195,7 @@ def get_deployment_start_time( ) for deployment_route in deployment_route_list.deployment_routes } - if app_name: - deployment_name = app_name + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment.name - else: - deployment_name = deployment.name - deployment_info, _route_prefix = deployments[deployment_name] + deployment_info, _route_prefix = deployments[f"default_{name}"] return deployment_info.start_time_ms @@ -209,6 +203,7 @@ def get_deployment_start_time( def test_e2e_basic_scale_up_down(min_replicas, serve_instance): """Send 100 requests and check that we autoscale up, and then back down.""" + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -231,29 +226,35 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(100)] # scale up one more replica from min_replicas wait_for_condition( - lambda: get_num_running_replicas(controller, A) >= min_replicas + 1 + lambda: check_autoscale_num_replicas(controller, "A") >= min_replicas + 1, + raise_exceptions=True, ) signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= min_replicas) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= min_replicas, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_e2e_basic_scale_up_down_with_0_replica(serve_instance): """Send 100 requests and check that we autoscale up, and then back down.""" + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -276,21 +277,28 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(100)] # scale up one more replica from min_replicas - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 1) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 1, + raise_exceptions=True, + ) signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 0) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 0, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @mock.patch.object(ServeController, "run_control_loop") @@ -316,7 +324,7 @@ def __call__(self): serve.run(A.bind()) controller = serve_instance._controller - assert get_num_running_replicas(controller, A) == 2 + assert len(get_running_replicas(controller, "A")) == 2 def test_upscale_downscale_delay(): @@ -612,6 +620,7 @@ def test_e2e_bursty(serve_instance): Sends 100 requests in bursts. Uses delays for smooth provisioning. """ + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -634,14 +643,18 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(100)] - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2) - num_replicas = get_num_running_replicas(controller, A) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 2, + raise_exceptions=True, + ) + num_replicas = check_autoscale_num_replicas(controller, "A") signal.send.remote() # Execute a bursty workload that issues 100 requests every 0.05 seconds @@ -652,15 +665,18 @@ def __call__(self): # parameters. for _ in range(5): time.sleep(0.05) - assert get_num_running_replicas(controller, A) == num_replicas + assert check_autoscale_num_replicas(controller, "A") == num_replicas [handle.remote() for _ in range(100)] signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 1, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -669,6 +685,7 @@ def test_e2e_intermediate_downscaling(serve_instance): Scales up, then down, and up again. """ + controller = serve_instance._controller signal = SignalActor.remote() @serve.deployment( @@ -691,31 +708,44 @@ def __call__(self): ray.get(signal.wait.remote()) handle = serve.run(A.bind()) - - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") [handle.remote() for _ in range(50)] wait_for_condition( - lambda: get_num_running_replicas(controller, A) >= 20, timeout=30 + lambda: check_autoscale_num_replicas(controller, "A") >= 20, + timeout=30, + raise_exceptions=True, ) signal.send.remote() - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1, timeout=30) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 1, + timeout=30, + raise_exceptions=True, + ) signal.send.remote(clear=True) [handle.remote() for _ in range(50)] wait_for_condition( - lambda: get_num_running_replicas(controller, A) >= 20, timeout=30 + lambda: check_autoscale_num_replicas(controller, "A") >= 20, + timeout=30, + raise_exceptions=True, ) signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1, timeout=30) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") < 1, + timeout=30, + raise_exceptions=True, + ) # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -723,66 +753,64 @@ def __call__(self): def test_e2e_update_autoscaling_deployment(serve_instance): # See https://github.com/ray-project/ray/issues/21017 for details - signal = SignalActor.remote() + controller = serve_instance._controller + signal = SignalActor.options(name="signal123").remote() - @serve.deployment( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 0, - "max_replicas": 10, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - # We will send over a lot of queries. This will make sure replicas are - # killed quickly during cleanup. - graceful_shutdown_timeout_s=1, - max_concurrent_queries=1000, - version="v1", - ) - class A: - def __call__(self): - ray.get(signal.wait.remote()) + app_config = { + "import_path": "ray.serve.tests.test_config_files.get_signal.app", + "deployments": [ + { + "name": "A", + "autoscaling_config": { + "metrics_interval_s": 0.1, + "min_replicas": 0, + "max_replicas": 10, + "look_back_period_s": 0.2, + "downscale_delay_s": 0.2, + "upscale_delay_s": 0.2, + }, + "graceful_shutdown_timeout_s": 1, + "max_concurrent_queries": 1000, + } + ], + } - handle = serve.run(A.bind()) + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Deployed A with min_replicas 1 and max_replicas 10.") + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + handle = serve.get_deployment("A").get_handle() + start_time = get_deployment_start_time(controller, "A") - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) - - assert get_num_running_replicas(controller, A) == 0 - + assert check_autoscale_num_replicas(controller, "A") == 0 [handle.remote() for _ in range(400)] print("Issued 400 requests.") - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 10) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 10, + raise_exceptions=True, + ) print("Scaled to 10 replicas.") - first_deployment_replicas = get_running_replica_tags(controller, A) + first_deployment_replicas = get_running_replica_tags(controller, "A") - assert get_num_running_replicas(controller, A) < 20 + assert check_autoscale_num_replicas(controller, "A") < 20 [handle.remote() for _ in range(458)] time.sleep(3) print("Issued 458 requests. Request routing in-progress.") - serve.run( - A.options( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 2, - "max_replicas": 20, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - version="v1", - ).bind() - ) + app_config["deployments"][0]["autoscaling_config"]["min_replicas"] = 2 + app_config["deployments"][0]["autoscaling_config"]["max_replicas"] = 20 + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Redeployed A.") - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 20) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") >= 20, + raise_exceptions=True, + ) print("Scaled up to 20 requests.") - second_deployment_replicas = get_running_replica_tags(controller, A) + second_deployment_replicas = get_running_replica_tags(controller, "A") # Confirm that none of the original replicas were de-provisioned assert_no_replicas_deprovisioned( @@ -792,104 +820,95 @@ def __call__(self): signal.send.remote() # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 2) - assert get_num_running_replicas(controller, A) > 1 + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") <= 2, + raise_exceptions=True, + ) + assert check_autoscale_num_replicas(controller, "A") > 1 # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, "A") == start_time # scale down to 0 - serve.run( - A.options( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 0, - "max_replicas": 20, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - version="v1", - ).bind() - ) + app_config["deployments"][0]["autoscaling_config"]["min_replicas"] = 0 + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Redeployed A.") + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) - wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1) - assert get_num_running_replicas(controller, A) == 0 + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") < 1, raise_exceptions=True + ) + assert check_autoscale_num_replicas(controller, "A") == 0 # scale up [handle.remote() for _ in range(400)] - wait_for_condition(lambda: get_num_running_replicas(controller, A) > 0) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") > 0, raise_exceptions=True + ) signal.send.remote() - wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1) + wait_for_condition( + lambda: check_autoscale_num_replicas(controller, "A") < 1, raise_exceptions=True + ) @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_e2e_raise_min_replicas(serve_instance): - signal = SignalActor.remote() + controller = serve_instance._controller + signal = SignalActor.options(name="signal123").remote() - @serve.deployment( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 0, - "max_replicas": 10, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - # We will send over a lot of queries. This will make sure replicas are - # killed quickly during cleanup. - graceful_shutdown_timeout_s=1, - max_concurrent_queries=1000, - version="v1", - ) - class A: - def __call__(self): - ray.get(signal.wait.remote()) + app_config = { + "import_path": "ray.serve.tests.test_config_files.get_signal.app", + "deployments": [ + { + "name": "A", + "autoscaling_config": { + "metrics_interval_s": 0.1, + "min_replicas": 0, + "max_replicas": 10, + "look_back_period_s": 0.2, + "downscale_delay_s": 0.2, + "upscale_delay_s": 0.2, + }, + "graceful_shutdown_timeout_s": 1, + "max_concurrent_queries": 1000, + } + ], + } - A.deploy() + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Deployed A.") + wait_for_condition( + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY + ) + start_time = get_deployment_start_time(controller, "A") - controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A, app_name=None) - - assert get_num_running_replicas(controller, A, app_name=None) == 0 + assert check_autoscale_num_replicas(controller, "A") == 0 - handle = A.get_handle() + handle = serve.get_deployment("default_A").get_handle() [handle.remote() for _ in range(1)] print("Issued one request.") time.sleep(2) - assert get_num_running_replicas(controller, A, app_name=None) == 1 + assert check_autoscale_num_replicas(controller, "A") == 1 print("Scale up to 1 replica.") - first_deployment_replicas = get_running_replica_tags(controller, A, app_name=None) + first_deployment_replicas = get_running_replica_tags(controller, "A") - A.options( - autoscaling_config={ - "metrics_interval_s": 0.1, - "min_replicas": 2, - "max_replicas": 10, - "look_back_period_s": 0.2, - "downscale_delay_s": 0.2, - "upscale_delay_s": 0.2, - }, - graceful_shutdown_timeout_s=1, - max_concurrent_queries=1000, - version="v1", - ).deploy() + app_config["deployments"][0]["autoscaling_config"]["min_replicas"] = 2 + serve_instance.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) print("Redeployed A with min_replicas set to 2.") - wait_for_condition( - lambda: get_num_running_replicas(controller, A, app_name=None) >= 2 + lambda: get_deployment_status(controller, "A") == DeploymentStatus.HEALTHY ) - time.sleep(5) # Confirm that autoscaler doesn't scale above 2 even after waiting - assert get_num_running_replicas(controller, A, app_name=None) == 2 + time.sleep(5) + assert check_autoscale_num_replicas(controller, "A") == 2 print("Autoscaled to 2 without issuing any new requests.") - second_deployment_replicas = get_running_replica_tags(controller, A, app_name=None) + second_deployment_replicas = get_running_replica_tags(controller, "A") # Confirm that none of the original replicas were de-provisioned assert_no_replicas_deprovisioned( @@ -902,13 +921,14 @@ def __call__(self): # As the queue is drained, we should scale back down. wait_for_condition( - lambda: get_num_running_replicas(controller, A, app_name=None) <= 2 + lambda: check_autoscale_num_replicas(controller, "A") <= 2, + raise_exceptions=True, ) - assert get_num_running_replicas(controller, A, app_name=None) > 1 + assert check_autoscale_num_replicas(controller, "A") > 1 print("Stayed at 2 replicas.") # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A, app_name=None) == start_time + assert get_deployment_start_time(controller, "A") == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -1067,6 +1087,7 @@ def check_num_replicas(live: int, dead: int): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_e2e_preserve_prev_replicas_rest_api(serve_instance): + client = serve_instance signal = SignalActor.options(name="signal", namespace="serve").remote() # Step 1: Prepare the script in a zip file so it can be submitted via REST API. @@ -1091,7 +1112,7 @@ def g(): ) # Step 2: Deploy it with max_replicas=1 - payload = { + app_config = { "import_path": "app:app", "runtime_env": {"working_dir": f"file://{tmp_path.name}"}, "deployments": [ @@ -1109,9 +1130,8 @@ def g(): ], } - client = ServeSubmissionClient("http://localhost:52365") - client.deploy_application(payload) - wait_for_condition(lambda: client.get_status()["app_status"]["status"] == "RUNNING") + client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) + wait_for_condition(lambda: client.get_serve_status().app_status.status == "RUNNING") # Step 3: Verify that it can scale from 0 to 1. @ray.remote @@ -1140,9 +1160,9 @@ def check_num_replicas(num: int): existing_pid = ray.get(ref) # Step 4: Change the max replicas to 2 - payload["deployments"][0]["autoscaling_config"]["max_replicas"] = 2 - client.deploy_application(payload) - wait_for_condition(lambda: client.get_status()["app_status"]["status"] == "RUNNING") + app_config["deployments"][0]["autoscaling_config"]["max_replicas"] = 2 + client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) + wait_for_condition(lambda: client.get_serve_status().app_status.status == "RUNNING") wait_for_condition(check_num_replicas, retry_interval_ms=1000, timeout=20, num=1) # Step 5: Make sure it is the same replica (lightweight change). @@ -1151,11 +1171,11 @@ def check_num_replicas(num: int): assert other_pid == existing_pid # Step 6: Make sure initial_replicas overrides previous replicas - payload["deployments"][0]["autoscaling_config"]["max_replicas"] = 5 - payload["deployments"][0]["autoscaling_config"]["initial_replicas"] = 3 - payload["deployments"][0]["autoscaling_config"]["upscale_delay"] = 600 - client.deploy_application(payload) - wait_for_condition(lambda: client.get_status()["app_status"]["status"] == "RUNNING") + app_config["deployments"][0]["autoscaling_config"]["max_replicas"] = 5 + app_config["deployments"][0]["autoscaling_config"]["initial_replicas"] = 3 + app_config["deployments"][0]["autoscaling_config"]["upscale_delay"] = 600 + client.deploy_apps(ServeDeploySchema(**{"applications": [app_config]})) + wait_for_condition(lambda: client.get_serve_status().app_status.status == "RUNNING") wait_for_condition(check_num_replicas, retry_interval_ms=1000, timeout=20, num=3) # Step 7: Make sure original replica is still running (lightweight change) diff --git a/python/ray/serve/tests/test_config_files/get_signal.py b/python/ray/serve/tests/test_config_files/get_signal.py new file mode 100644 index 0000000000000..7ed9c2f4b9bb9 --- /dev/null +++ b/python/ray/serve/tests/test_config_files/get_signal.py @@ -0,0 +1,12 @@ +import ray +from ray import serve + + +@serve.deployment +class A: + def __call__(self): + signal = ray.get_actor("signal123") + ray.get(signal.wait.remote()) + + +app = A.bind()