Skip to content

Commit

Permalink
[4/N] Support async actor and async generator interface. (ray-project…
Browse files Browse the repository at this point in the history
…#35584)

NOTE: It is a big PR, but most of code is testing (200~300 lines I believe)

This is the fourth PR to support the streaming generator.

The detailed design and API proposal can be found from https://docs.google.com/document/d/1hAASLe2sCoay23raqxqwJdSDiJWNMcNhlTwWJXsJOU4/edit#heading=h.w91y1fgnpu0m.
The Execution plan can be found from https://docs.google.com/document/d/1hAASLe2sCoay23raqxqwJdSDiJWNMcNhlTwWJXsJOU4/edit#heading=h.kxktymq5ihf7.
There will be 4 PRs to enable streaming generator for Ray Serve (phase 1).

This PR -> introduce cpp interfaces to handle intermediate task return [1/N] Streaming Generator. Cpp interfaces and implementation ray-project#35291
Support core worker APIs + cython generator interface. [2/N] Streaming Generator. Support core worker APIs + cython generator interface. ray-project#35324
E2e integration [3/N] Streaming Generator. E2e integration ray-project#35325 (review)
Support async actors [4/N] Support async actor and async generator interface. ray-project#35382 < ---- This PR
adds an async actor execution support to the generator implementation (basically keep posting generator.anext to the event loop)
Impelements a standard async generator interface from Python. (anext and aiter)
  • Loading branch information
rkooo567 authored and scv119 committed Jun 11, 2023
1 parent 9f97fad commit 92e04cb
Show file tree
Hide file tree
Showing 22 changed files with 838 additions and 149 deletions.
7 changes: 6 additions & 1 deletion python/ray/_private/async_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ def get_new_event_loop():
return asyncio.new_event_loop()


def is_async_func(func):
"""Return True if the function is an async or async generator method."""
return inspect.iscoroutinefunction(func) or inspect.isasyncgenfunction(func)


def sync_to_async(func):
"""Convert a blocking function to async function"""

if inspect.iscoroutinefunction(func):
if is_async_func(func):
return func

async def wrapper(*args, **kwargs):
Expand Down
1 change: 0 additions & 1 deletion python/ray/_private/ray_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ def async_actor_multi():
ray.get([async_actor_work.remote(a) for _ in range(m)])

results += timeit("n:n async-actor calls async", async_actor_multi, m * n)
ray.shutdown()

NUM_PGS = 100
NUM_BUNDLES = 1
Expand Down
10 changes: 9 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ from ray.includes.libcoreworker cimport (

from ray.includes.unique_ids cimport (
CObjectID,
CActorID
CActorID,
CTaskID,
)
from ray.includes.function_descriptor cimport (
CFunctionDescriptor,
Expand Down Expand Up @@ -154,6 +155,13 @@ cdef class CoreWorker:
cdef python_scheduling_strategy_to_c(
self, python_scheduling_strategy,
CSchedulingStrategy *c_scheduling_strategy)
cdef CObjectID allocate_dynamic_return_id_for_generator(
self,
const CAddress &owner_address,
const CTaskID &task_id,
return_size,
generator_index,
is_async_actor)

cdef class FunctionDescriptor:
cdef:
Expand Down
Loading

0 comments on commit 92e04cb

Please sign in to comment.