Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Modify max_concurrency in actor options to respect max_ongoing_requests (#47681) #48274

Merged
6 changes: 6 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,12 @@ def env_set_by_user(key):
# tasks.
DEFAULT_TASK_MAX_RETRIES = 3

# Default max_concurrency option in @ray.remote for threaded actors.
DEFAULT_MAX_CONCURRENCY_THREADED = 1

# Default max_concurrency option in @ray.remote for async actors.
DEFAULT_MAX_CONCURRENCY_ASYNC = 1000

# Prefix for namespaces which are used internally by ray.
# Jobs within these namespaces should be hidden from users
# and should not be considered user activity.
Expand Down
6 changes: 5 additions & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,11 @@ def _remote(self, args=None, kwargs=None, **actor_options):
is_asyncio = has_async_methods(meta.modified_class)

if actor_options.get("max_concurrency") is None:
actor_options["max_concurrency"] = 1000 if is_asyncio else 1
actor_options["max_concurrency"] = (
ray_constants.DEFAULT_MAX_CONCURRENCY_ASYNC
if is_asyncio
else ray_constants.DEFAULT_MAX_CONCURRENCY_THREADED
)

if client_mode_should_convert():
return client_mode_convert_actor(self, args, kwargs, **actor_options)
Expand Down
13 changes: 13 additions & 0 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import ray
from ray import ObjectRef, cloudpickle
from ray._private import ray_constants
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayError, RayTaskError, RuntimeEnvSetupError
from ray.serve import metrics
Expand Down Expand Up @@ -474,6 +475,18 @@ def start(self, deployment_info: DeploymentInfo) -> ReplicaSchedulingRequest:
}
actor_options.update(deployment_info.replica_config.ray_actor_options)

# A replica's default `max_concurrency` value can prevent it from
# respecting the configured `max_ongoing_requests`. To avoid this
# unintentional behavior, use `max_ongoing_requests` to override
# the Actor's `max_concurrency` if it is larger.
if (
deployment_info.deployment_config.max_ongoing_requests
> ray_constants.DEFAULT_MAX_CONCURRENCY_ASYNC
):
actor_options[
"max_concurrency"
] = deployment_info.deployment_config.max_ongoing_requests

return ReplicaSchedulingRequest(
replica_id=self.replica_id,
actor_def=actor_def,
Expand Down
19 changes: 19 additions & 0 deletions python/ray/serve/tests/unit/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest

from ray._private.ray_constants import DEFAULT_MAX_CONCURRENCY_ASYNC
from ray.serve._private.autoscaling_state import AutoscalingStateManager
from ray.serve._private.common import (
DeploymentHandleSource,
Expand Down Expand Up @@ -284,6 +285,7 @@ def deployment_info(
info = DeploymentInfo(
version=version,
start_time_ms=0,
actor_name="abc",
deployment_config=DeploymentConfig(
num_replicas=num_replicas, user_config=user_config, **config_opts
),
Expand Down Expand Up @@ -2914,6 +2916,23 @@ def test_default_value(self):
assert actor_replica.health_check_period_s == DEFAULT_HEALTH_CHECK_PERIOD_S
assert actor_replica.health_check_timeout_s == DEFAULT_HEALTH_CHECK_TIMEOUT_S

def test_max_concurrency_override(self):
actor_replica = ActorReplicaWrapper(
version=deployment_version("1"),
replica_id=ReplicaID(
"abc123",
deployment_id=DeploymentID(name="test_deployment", app_name="test_app"),
),
)
max_ongoing_requests = DEFAULT_MAX_CONCURRENCY_ASYNC + 1
d_info, _ = deployment_info(max_ongoing_requests=max_ongoing_requests)
replica_scheduling_request = actor_replica.start(d_info)
assert (
"max_concurrency" in replica_scheduling_request.actor_options
and replica_scheduling_request.actor_options["max_concurrency"]
== max_ongoing_requests
)


def test_get_active_node_ids(mock_deployment_state_manager):
"""Test get_active_node_ids() are collecting the correct node ids
Expand Down