Skip to content

Commit

Permalink
[serve] Clean up naming and unused fields left from ReplicaResult r…
Browse files Browse the repository at this point in the history
…efactoring (ray-project#48466)

Variable names and comments were partially still referring to object
refs. This was highly confusing. There was also some dead locking code
left in.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
edoakes authored and JP-sDEV committed Nov 14, 2024
1 parent e640644 commit d999984
Showing 1 changed file with 20 additions and 37 deletions.
57 changes: 20 additions & 37 deletions python/ray/serve/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,41 +354,22 @@ def __reduce__(self):


class _DeploymentResponseBase:
def __init__(self, object_ref_future: concurrent.futures.Future):
def __init__(self, replica_result_future: concurrent.futures.Future[ReplicaResult]):
self._cancelled = False
# The result of `object_ref_future` must be an ObjectRef or ObjectRefGenerator.
self._object_ref_future = object_ref_future

# Cached result of the `object_ref_future`.
# This is guarded by the below locks for async and sync methods.
# It's not expected that user code can mix async and sync methods (sync methods
# raise an exception when running in an `asyncio` loop).
# The `asyncio` lock is lazily constructed because the constructor may run on
# a different `asyncio` loop than method calls (or not run on one at all).
self._object_ref_or_gen = None
self.__lazy_object_ref_or_gen_asyncio_lock = None
self._object_ref_or_gen_sync_lock = threading.Lock()
self._replica_result_future = replica_result_future
self._replica_result: Optional[ReplicaResult] = None

@property
def _object_ref_or_gen_asyncio_lock(self) -> asyncio.Lock:
"""Lazy `asyncio.Lock` object."""
if self.__lazy_object_ref_or_gen_asyncio_lock is None:
self.__lazy_object_ref_or_gen_asyncio_lock = asyncio.Lock()

return self.__lazy_object_ref_or_gen_asyncio_lock

def _fetch_future_result_sync(
self, _timeout_s: Optional[float] = None
) -> ReplicaResult:
"""Synchronously fetch the result of the `_object_ref_future`.
"""Synchronously fetch the replica result.
Wrap the result in a ReplicaResult and store it in _result.
The result is cached in `self._replica_result`.
"""

if self._replica_result is None:
try:
self._replica_result = self._object_ref_future.result(
self._replica_result = self._replica_result_future.result(
timeout=_timeout_s
)
except concurrent.futures.TimeoutError:
Expand All @@ -397,15 +378,17 @@ def _fetch_future_result_sync(
return self._replica_result

async def _fetch_future_result_async(self) -> ReplicaResult:
"""Asynchronously fetch the result of the `_object_ref_future`.
"""Asynchronously fetch replica result.
Wrap the result in a ReplicaResult and store it in _result.
The result is cached in `self._replica_result`..
"""

if self._replica_result is None:
# Use `asyncio.wrap_future` so `self._object_ref_future` can be awaited
# Use `asyncio.wrap_future` so `self._replica_result_future` can be awaited
# safely from any asyncio loop.
self._replica_result = await asyncio.wrap_future(self._object_ref_future)
self._replica_result = await asyncio.wrap_future(
self._replica_result_future
)

return self._replica_result

Expand All @@ -414,9 +397,9 @@ def cancel(self):
This is best effort.
- If the request hasn't been assigned to a replica actor, the assignment will be
- If the request hasn't been assigned to a replica, the assignment will be
cancelled.
- If the request has been assigned to a replica actor, `ray.cancel` will be
- If the request has been assigned to a replica, `ray.cancel` will be
called on the object ref, attempting to cancel the request and any downstream
requests it makes.
Expand All @@ -433,9 +416,9 @@ def cancel(self):
return

self._cancelled = True
if not self._object_ref_future.done():
self._object_ref_future.cancel()
elif self._object_ref_future.exception() is None:
if not self._replica_result_future.done():
self._replica_result_future.cancel()
elif self._replica_result_future.exception() is None:
self._fetch_future_result_sync()
self._replica_result.cancel()

Expand Down Expand Up @@ -566,7 +549,7 @@ async def _to_object_ref(self) -> ray.ObjectRef:
actor method call.
This method is `async def` because it will block until the handle call has been
assigned to a replica actor. If there are many requests in flight and all
assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
"""

Expand All @@ -587,7 +570,7 @@ def _to_object_ref_sync(
actor method call.
This method is a *blocking* call because it will block until the handle call has
been assigned to a replica actor. If there are many requests in flight and all
been assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
From inside a deployment, `_to_object_ref` should be used instead to avoid
Expand Down Expand Up @@ -703,7 +686,7 @@ async def _to_object_ref_gen(self) -> ObjectRefGenerator:
"""Advanced API to convert the generator to a Ray `ObjectRefGenerator`.
This method is `async def` because it will block until the handle call has been
assigned to a replica actor. If there are many requests in flight and all
assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
"""

Expand All @@ -721,7 +704,7 @@ def _to_object_ref_gen_sync(
"""Advanced API to convert the generator to a Ray `ObjectRefGenerator`.
This method is a *blocking* call because it will block until the handle call has
been assigned to a replica actor. If there are many requests in flight and all
been assigned to a replica. If there are many requests in flight and all
replicas' queues are full, this may be a slow operation.
From inside a deployment, `_to_object_ref_gen` should be used instead to avoid
Expand Down

0 comments on commit d999984

Please sign in to comment.