diff --git a/.buildkite/pipeline.build.yml b/.buildkite/pipeline.build.yml index 75fb6b47d699f..0d59a4945f5a7 100644 --- a/.buildkite/pipeline.build.yml +++ b/.buildkite/pipeline.build.yml @@ -10,6 +10,12 @@ commands: - ./java/test.sh +- label: ":java: Java (RAY_SERVE_ENABLE_NEW_ROUTING=1)" + conditions: ["RAY_CI_JAVA_AFFECTED"] + instance_size: medium + commands: + - export RAY_SERVE_ENABLE_NEW_ROUTING=1 && ./java/test.sh + - label: ":serverless: Dashboard Tests" conditions: [ diff --git a/java/serve/src/main/java/io/ray/serve/replica/RayServeReplica.java b/java/serve/src/main/java/io/ray/serve/replica/RayServeReplica.java index 65d2e2edcd795..b00f5f4f36608 100644 --- a/java/serve/src/main/java/io/ray/serve/replica/RayServeReplica.java +++ b/java/serve/src/main/java/io/ray/serve/replica/RayServeReplica.java @@ -15,4 +15,8 @@ default boolean checkHealth() { default boolean prepareForShutdown() { return true; } + + default int getNumOngoingRequests() { + return 0; + } } diff --git a/java/serve/src/main/java/io/ray/serve/replica/RayServeReplicaImpl.java b/java/serve/src/main/java/io/ray/serve/replica/RayServeReplicaImpl.java index e605834b4e963..4e33d335b64a2 100644 --- a/java/serve/src/main/java/io/ray/serve/replica/RayServeReplicaImpl.java +++ b/java/serve/src/main/java/io/ray/serve/replica/RayServeReplicaImpl.java @@ -278,6 +278,11 @@ public synchronized boolean prepareForShutdown() { return true; } + @Override + public int getNumOngoingRequests() { + return numOngoingRequests.get(); + } + @Override public DeploymentVersion reconfigure(byte[] deploymentConfigBytes) { config = DeploymentConfig.fromProtoBytes(deploymentConfigBytes); diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index e9fae1d52fca9..01cea5e4860fa 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -101,32 +101,81 @@ class ActorReplicaWrapper: def __init__(self, replica_info: RunningReplicaInfo): self.replica_info = replica_info + if replica_info.is_cross_language: + self.actor_handle = JavaActorHandleProxy(replica_info.actor_handle) + else: + self.actor_handle = replica_info.actor_handle + @property def replica_id(self) -> str: return self.replica_info.replica_tag async def get_queue_state(self) -> Tuple[str, int, bool]: - queue_len = ( - await self.replica_info.actor_handle.get_num_ongoing_requests.remote() - ) + # NOTE(edoakes): the `get_num_ongoing_requests` method name is shared by + # the Python and Java replica implementations. If you change it, you need to + # change both (or introduce a branch here). + queue_len = await self.actor_handle.get_num_ongoing_requests.remote() accepted = queue_len < self.replica_info.max_concurrent_queries return self.replica_id, queue_len, accepted - def send_query( + def _send_query_java(self, query: Query) -> ray.ObjectRef: + """Send the query to a Java replica. + + Does not currently support streaming. + """ + if query.metadata.is_streaming: + raise RuntimeError("Streaming not supported for Java.") + + # Java only supports a single argument. + arg = query.args[0] + + # Convert HTTP requests to Java-accepted format (single string). + if query.metadata.is_http_request: + assert isinstance(arg, bytes) + loaded_http_input = pickle.loads(arg) + query_string = loaded_http_input.scope.get("query_string") + if query_string: + arg = query_string.decode().split("=", 1)[1] + elif loaded_http_input.body: + arg = loaded_http_input.body.decode() + + # Default call method in java is "call," not "__call__" like Python. + call_method = query.metadata.call_method + if call_method == "__call__": + call_method = "call" + + return self.actor_handle.handle_request.remote( + RequestMetadataProto( + request_id=query.metadata.request_id, + endpoint=query.metadata.endpoint, + call_method=call_method, + ).SerializeToString(), + [arg], + ) + + def _send_query_python( self, query: Query ) -> Union[ray.ObjectRef, "ray._raylet.StreamingObjectRefGenerator"]: - actor = self.replica_info.actor_handle + """Send the query to a Python replica.""" if query.metadata.is_streaming: - obj_ref = actor.handle_request_streaming.options( + obj_ref = self.actor_handle.handle_request_streaming.options( num_returns="streaming" ).remote(pickle.dumps(query.metadata), *query.args, **query.kwargs) else: - _, obj_ref = actor.handle_request.remote( + _, obj_ref = self.actor_handle.handle_request.remote( pickle.dumps(query.metadata), *query.args, **query.kwargs ) return obj_ref + def send_query( + self, query: Query + ) -> Union[ray.ObjectRef, "ray._raylet.StreamingObjectRefGenerator"]: + if self.replica_info.is_cross_language: + return self._send_query_java(query) + else: + return self._send_query_python(query) + class ReplicaScheduler(ABC): """Abstract interface for a replica scheduler (how the router calls it)."""