-
Notifications
You must be signed in to change notification settings - Fork 6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support generators to allow tasks to return a dynamic number of objects #28291
[core] Support generators to allow tasks to return a dynamic number of objects #28291
Conversation
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
- nondeterministic recovery test - ref counting bug? Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Still have some TODOs, but the main changes are ready for review. |
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet, general approach LGTM but I'll defer to others for a more thorough review.
In addition to the object eviction subscription, is there a potential race between the pinned object location update and the push task reply for the dynamic return objects? IIRC both of those RPCs are async with no synchronization barrier in-between. Maybe this is a non-issue because we always add the pinned object location when processing the push task reply?
ray/src/ray/core_worker/task_manager.cc
Lines 249 to 252 in 775ff3e
// NOTE(swang): We need to add the location of the object before marking | |
// it as local in the in-memory store so that the data locality policy | |
// will choose the right raylet for any queued dependent tasks. | |
reference_counter_->UpdateObjectPinnedAtRaylet(object_id, worker_raylet_id); |
// eviction events before we know about the object. This can happen when we | ||
// receive the subscription request before the reply from the task that | ||
// created the object. Add the dynamically created object to our ref | ||
// counter so that we know that it exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I was wondering if this race was going to be covered.
src/ray/core_worker/core_worker.cc
Outdated
for (const auto &return_id : return_ids) { | ||
RAY_LOG(DEBUG) << "Task " << task_spec.TaskId() << " will return object " | ||
<< return_id; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This return ID iteration + debug logging could be moved to the loop directly above this one.
@@ -343,6 +343,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { | |||
/// \return Status. | |||
Status SealExisting(const ObjectID &object_id, | |||
bool pin_object, | |||
const ObjectID &generator_id = ObjectID::Nil(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generator_id
should be added to the docstring for this method and SealReturnObject
, PinExistingReturnObject
.
Hmm I think this part is okay because of what you said: it all happens locally at the task caller when processing the push task reply. I think there may be a race condition for object directory location updates, though, let me look into this. |
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Okay, I believe I've resolved this issue by also attaching the generator ID to spill location updates. I don't think it's necessary for in-memory locations (see added comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tried this, very cool! A couple API comments:
- When you specify num_returns, it seems that you can also yield the exact same number of values, but it is not a generator return. This seems confusing, should we disallow mixing num_returns and generators?
- Should we define
__len__
on the generator return object? It seems not unreasonable to include this, and even if we decide in the future to support streaming generators, we could just raise an error trying to get the length in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add docs to this I think.
Ping me when it's ready for more reviews :) |
I think all the comments were addressed already, trying to fix CI now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM! Before merging, let's please mark the num_returns="dynamic" API as experimental.
Before we declare it stable, in particular I'm curious why the num_returns="dynamic" option doesn't return a generator of ObjectRefs (from the API perspective that seems more natural for me). Are there implementation limitations for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My last few comments :)
# number of objects as before. | ||
num_returns = returns[0].size() | ||
else: | ||
# This is the first execution of the task, so we don't know how |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't necessarily mean it's the first execution of the task? It can also mean the generator is empty?
Are we able to catch this case: the first execution returns an empty generator but re-execution returns a non-empty generator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm good catch, let me check. We'll probably have to resolve this case as a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it works since we don't reconstruct empty ObjectRefGenerators. Added a test.
raise ValueError( | ||
"Task returned more than num_returns={} objects.".format( | ||
n_returns)) | ||
num_returns)) | ||
while i >= returns[0].size(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about this? Is it possible that this loop will be executed more than once?
/// \param[in] owner_address Address of the owner of the object who will be contacted by | ||
/// the raylet if the object is pinned. If not provided, defaults to this worker. | ||
/// \return Status. | ||
Status SealExisting(const ObjectID &object_id, | ||
bool pin_object, | ||
const ObjectID &generator_id = ObjectID::Nil(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we decide when to use which style? What's the guideline we should follow in the future?
The main reason right now was to avoid complicating pros:
cons:
|
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
All new tests passing. |
Why are these changes needed?
This adds support for tasks that need to return a dynamic number of objects. When a remote generator function is invoked and
num_returns
for the task is 1, the worker will dynamically allocateray.put
IDs for these objects and store anObjectRefGenerator
as its return value. This allows the worker to choose how many objects to return and to keep heap memory low, since it does not need to keep all objects in memory simultaneously.Unlike normal
ray.put()
, we assign the task caller as the owner of the object. This is to improve fault tolerance, as the owner can recover dynamically generated objects through the normal lineage reconstruction codepath.The main complication has to do with notifying the task caller that it owns these objects. We do this in two places, which is necessary because the protocols are asynchronous, so either message can arrive first.
To register the dynamic return, the owner adds the ObjectRef to the ref counter and marks that it is contained in the generator object.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.TODO: