Skip to content

Commit

Permalink
[serve] Add Java support for power of two choices routing (`RAY_SERVE…
Browse files Browse the repository at this point in the history
…_ENABLE_NEW_ROUTING=1`) (#36865)

This is required to turn the new routing on by default. Also adds a build to test this condition.

Only required adding the same method for fetching the queue length to the Java replica.
  • Loading branch information
edoakes authored Jun 28, 2023
1 parent 9e716a4 commit ca8c76d
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 7 deletions.
6 changes: 6 additions & 0 deletions .buildkite/pipeline.build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
commands:
- ./java/test.sh

- label: ":java: Java (RAY_SERVE_ENABLE_NEW_ROUTING=1)"
conditions: ["RAY_CI_JAVA_AFFECTED"]
instance_size: medium
commands:
- export RAY_SERVE_ENABLE_NEW_ROUTING=1 && ./java/test.sh

- label: ":serverless: Dashboard Tests"
conditions:
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ default boolean checkHealth() {
default boolean prepareForShutdown() {
return true;
}

default int getNumOngoingRequests() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ public synchronized boolean prepareForShutdown() {
return true;
}

@Override
public int getNumOngoingRequests() {
return numOngoingRequests.get();
}

@Override
public DeploymentVersion reconfigure(byte[] deploymentConfigBytes) {
config = DeploymentConfig.fromProtoBytes(deploymentConfigBytes);
Expand Down
63 changes: 56 additions & 7 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,32 +101,81 @@ class ActorReplicaWrapper:
def __init__(self, replica_info: RunningReplicaInfo):
self.replica_info = replica_info

if replica_info.is_cross_language:
self.actor_handle = JavaActorHandleProxy(replica_info.actor_handle)
else:
self.actor_handle = replica_info.actor_handle

@property
def replica_id(self) -> str:
return self.replica_info.replica_tag

async def get_queue_state(self) -> Tuple[str, int, bool]:
queue_len = (
await self.replica_info.actor_handle.get_num_ongoing_requests.remote()
)
# NOTE(edoakes): the `get_num_ongoing_requests` method name is shared by
# the Python and Java replica implementations. If you change it, you need to
# change both (or introduce a branch here).
queue_len = await self.actor_handle.get_num_ongoing_requests.remote()
accepted = queue_len < self.replica_info.max_concurrent_queries
return self.replica_id, queue_len, accepted

def send_query(
def _send_query_java(self, query: Query) -> ray.ObjectRef:
"""Send the query to a Java replica.
Does not currently support streaming.
"""
if query.metadata.is_streaming:
raise RuntimeError("Streaming not supported for Java.")

# Java only supports a single argument.
arg = query.args[0]

# Convert HTTP requests to Java-accepted format (single string).
if query.metadata.is_http_request:
assert isinstance(arg, bytes)
loaded_http_input = pickle.loads(arg)
query_string = loaded_http_input.scope.get("query_string")
if query_string:
arg = query_string.decode().split("=", 1)[1]
elif loaded_http_input.body:
arg = loaded_http_input.body.decode()

# Default call method in java is "call," not "__call__" like Python.
call_method = query.metadata.call_method
if call_method == "__call__":
call_method = "call"

return self.actor_handle.handle_request.remote(
RequestMetadataProto(
request_id=query.metadata.request_id,
endpoint=query.metadata.endpoint,
call_method=call_method,
).SerializeToString(),
[arg],
)

def _send_query_python(
self, query: Query
) -> Union[ray.ObjectRef, "ray._raylet.StreamingObjectRefGenerator"]:
actor = self.replica_info.actor_handle
"""Send the query to a Python replica."""
if query.metadata.is_streaming:
obj_ref = actor.handle_request_streaming.options(
obj_ref = self.actor_handle.handle_request_streaming.options(
num_returns="streaming"
).remote(pickle.dumps(query.metadata), *query.args, **query.kwargs)
else:
_, obj_ref = actor.handle_request.remote(
_, obj_ref = self.actor_handle.handle_request.remote(
pickle.dumps(query.metadata), *query.args, **query.kwargs
)

return obj_ref

def send_query(
self, query: Query
) -> Union[ray.ObjectRef, "ray._raylet.StreamingObjectRefGenerator"]:
if self.replica_info.is_cross_language:
return self._send_query_java(query)
else:
return self._send_query_python(query)


class ReplicaScheduler(ABC):
"""Abstract interface for a replica scheduler (how the router calls it)."""
Expand Down

0 comments on commit ca8c76d

Please sign in to comment.