Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1/N] Streaming Generator. Cpp interfaces and implementation #35291

Merged
merged 20 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ cdef store_task_errors(
CTaskType task_type,
proctitle,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
c_string* application_error,
):
c_string* application_error):
cdef:
CoreWorker core_worker = worker.core_worker

Expand Down Expand Up @@ -655,6 +654,7 @@ cdef store_task_errors(
raise RayActorError.from_task_error(failure_object)
return num_errors_stored


cdef execute_dynamic_generator_and_store_task_outputs(
generator,
const CObjectID &generator_id,
Expand Down Expand Up @@ -995,7 +995,6 @@ cdef void execute_task(

# Store the outputs in the object store.
with core_worker.profile_event(b"task:store_outputs"):
num_returns = returns[0].size()
if dynamic_returns != NULL:
if not inspect.isgenerator(outputs):
raise ValueError(
Expand Down
9 changes: 7 additions & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
c_bool PinExistingReturnObject(
const CObjectID& return_id,
shared_ptr[CRayObject] *return_object,
const CObjectID& generator_id
)
const CObjectID& generator_id)
CObjectID AllocateDynamicReturnId()

CJobID GetCurrentJobId()
Expand Down Expand Up @@ -236,6 +235,12 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
int64_t timeout_ms,
c_vector[shared_ptr[CObjectLocation]] *results)
CRayStatus TriggerGlobalGC()
CRayStatus ReportIntermediateTaskReturn(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used by execution path to report intermediate result later.

const pair[CObjectID, shared_ptr[CRayObject]] &dynamic_return_object,
const CObjectID &generator_id,
const CAddress &caller_address,
int64_t idx,
c_bool finished)
c_string MemoryUsageString()

CWorkerContext &GetWorkerContext()
Expand Down
1 change: 1 addition & 0 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def _remote(self, args=None, kwargs=None, **task_options):
num_returns = task_options["num_returns"]
if num_returns == "dynamic":
num_returns = -1

max_retries = task_options["max_retries"]
retry_exceptions = task_options["retry_exceptions"]
if isinstance(retry_exceptions, (list, tuple)):
Expand Down
8 changes: 5 additions & 3 deletions python/ray/tests/test_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def read(gen):
gen = ray.get(
remote_generator_fn.options(num_returns="dynamic").remote(0, store_in_plasma)
)
assert len(gen) == 0
assert len(list(gen)) == 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can ignore every change from this file.


# Check that passing as task arg.
gen = remote_generator_fn.options(num_returns="dynamic").remote(10, store_in_plasma)
Expand All @@ -284,7 +284,9 @@ def static(num_returns):
return list(range(num_returns))

with pytest.raises(ray.exceptions.RayTaskError):
ray.get(static.remote(3))
gen = ray.get(static.remote(3))
for ref in gen:
ray.get(ref)


def test_dynamic_generator_distributed(ray_start_cluster):
Expand Down Expand Up @@ -535,7 +537,7 @@ def maybe_empty_generator(exec_counter):

@ray.remote
def check(empty_generator):
return len(empty_generator) == 0
return len(list(empty_generator)) == 0

exec_counter = ExecutionCounter.remote()
gen = maybe_empty_generator.remote(exec_counter)
Expand Down
8 changes: 7 additions & 1 deletion src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ enum class StatusCode : char {
OutOfDisk = 28,
ObjectUnknownOwner = 29,
RpcError = 30,
OutOfResource = 31
OutOfResource = 31,
ObjectRefStreamEoF = 32
};

#if defined(__clang__)
Expand Down Expand Up @@ -146,6 +147,10 @@ class RAY_EXPORT Status {
return Status(StatusCode::KeyError, msg);
}

static Status ObjectRefStreamEoF(const std::string &msg) {
return Status(StatusCode::ObjectRefStreamEoF, msg);
}

static Status TypeError(const std::string &msg) {
return Status(StatusCode::TypeError, msg);
}
Expand Down Expand Up @@ -254,6 +259,7 @@ class RAY_EXPORT Status {
bool IsOutOfMemory() const { return code() == StatusCode::OutOfMemory; }
bool IsOutOfDisk() const { return code() == StatusCode::OutOfDisk; }
bool IsKeyError() const { return code() == StatusCode::KeyError; }
bool IsObjectRefStreamEoF() const { return code() == StatusCode::ObjectRefStreamEoF; }
bool IsInvalid() const { return code() == StatusCode::Invalid; }
bool IsIOError() const { return code() == StatusCode::IOError; }
bool IsTypeError() const { return code() == StatusCode::TypeError; }
Expand Down
30 changes: 30 additions & 0 deletions src/ray/core_worker/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,35 @@ std::string GenerateCachedActorName(const std::string &ns,
return ns + "-" + actor_name;
}

void SerializeReturnObject(const ObjectID &object_id,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just a copy/paste from other file.

const std::shared_ptr<RayObject> &return_object,
rpc::ReturnObject *return_object_proto) {
return_object_proto->set_object_id(object_id.Binary());

if (!return_object) {
// This should only happen if the local raylet died. Caller should
// retry the task.
RAY_LOG(WARNING) << "Failed to create task return object " << object_id
<< " in the object store, exiting.";
QuickExit();
}
return_object_proto->set_size(return_object->GetSize());
if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) {
return_object_proto->set_in_plasma(true);
} else {
if (return_object->GetData() != nullptr) {
return_object_proto->set_data(return_object->GetData()->Data(),
return_object->GetData()->Size());
}
if (return_object->GetMetadata() != nullptr) {
return_object_proto->set_metadata(return_object->GetMetadata()->Data(),
return_object->GetMetadata()->Size());
}
}
for (const auto &nested_ref : return_object->GetNestedRefs()) {
return_object_proto->add_nested_inlined_refs()->CopyFrom(nested_ref);
}
}

} // namespace core
} // namespace ray
5 changes: 5 additions & 0 deletions src/ray/core_worker/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "ray/common/task/task_spec.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/common.pb.h"

namespace ray {
namespace core {
Expand All @@ -37,6 +38,10 @@ std::string LanguageString(Language language);
// `namespace-[job_id-]actor_name`
std::string GenerateCachedActorName(const std::string &ns, const std::string &actor_name);

void SerializeReturnObject(const ObjectID &object_id,
const std::shared_ptr<RayObject> &return_object,
rpc::ReturnObject *return_object_proto);

/// Information about a remote function.
class RayFunction {
public:
Expand Down
53 changes: 53 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2599,6 +2599,9 @@ Status CoreWorker::ExecuteTask(
dynamic_return_objects = NULL;
} else if (task_spec.AttemptNumber() > 0) {
for (const auto &dynamic_return_id : task_spec.DynamicReturnIds()) {
// Increase the put index so that when the generator creates a new obj
// the object id won't conflict.
worker_context_.GetNextPutIndex();
dynamic_return_objects->push_back(
std::make_pair<>(dynamic_return_id, std::shared_ptr<RayObject>()));
RAY_LOG(DEBUG) << "Re-executed task " << task_spec.TaskId()
Expand Down Expand Up @@ -2815,6 +2818,56 @@ ObjectID CoreWorker::AllocateDynamicReturnId() {
return return_id;
}

Status CoreWorker::ReportIntermediateTaskReturn(
const std::pair<ObjectID, std::shared_ptr<RayObject>> &dynamic_return_object,
const ObjectID &generator_id,
const rpc::Address &caller_address,
int64_t idx,
bool finished) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it true that when finished is True, dynamic_return_object must be empty?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can remove finished flag and replying on dynamic_return_objects being Nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. I decided to just add else that checks finish == true when object id == nil.

RAY_LOG(DEBUG) << "Write the object ref stream, index: " << idx
<< " finished: " << finished << ", id: " << dynamic_return_object.first;
rpc::ReportIntermediateTaskReturnRequest request;
request.mutable_worker_addr()->CopyFrom(rpc_address_);
request.set_idx(idx);
request.set_finished(finished);
request.set_generator_id(generator_id.Binary());
auto client = core_worker_client_pool_->GetOrConnect(caller_address);

if (!dynamic_return_object.first.IsNil()) {
RAY_CHECK_EQ(finished, false);
auto return_object_proto = request.add_dynamic_return_objects();
SerializeReturnObject(
dynamic_return_object.first, dynamic_return_object.second, return_object_proto);
std::vector<ObjectID> deleted;
// When we allocate a dynamic return ID (AllocateDynamicReturnId),
// we borrow the object. When the object value is allocatd, the
// memory store is updated. We should clear borrowers and memory store
// here.
ReferenceCounter::ReferenceTableProto borrowed_refs;
reference_counter_->PopAndClearLocalBorrowers(
{dynamic_return_object.first}, &borrowed_refs, &deleted);
memory_store_->Delete(deleted);
}

client->ReportIntermediateTaskReturn(
request,
[](const Status &status, const rpc::ReportIntermediateTaskReturnReply &reply) {
if (!status.ok()) {
// TODO(sang): Handle network error more gracefully.
RAY_LOG(ERROR) << "Failed to send the object ref.";
}
});
return Status::OK();
}

void CoreWorker::HandleReportIntermediateTaskReturn(
rpc::ReportIntermediateTaskReturnRequest request,
rpc::ReportIntermediateTaskReturnReply *reply,
rpc::SendReplyCallback send_reply_callback) {
task_manager_->HandleReportIntermediateTaskReturn(request);
send_reply_callback(Status::OK(), nullptr, nullptr);
}

std::vector<rpc::ObjectReference> CoreWorker::ExecuteTaskLocalMode(
const TaskSpecification &task_spec, const ActorID &actor_id) {
auto resource_ids = std::make_shared<ResourceMappingType>();
Expand Down
41 changes: 41 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::vector<ObjectID> deleted;
reference_counter_->RemoveLocalReference(object_id, &deleted);
// TOOD(ilr): better way of keeping an object from being deleted
// TODO(sang): This seems bad... We should delete the memory store
// properly from reference counter.
if (!options_.is_local_mode) {
memory_store_->Delete(deleted);
}
Expand Down Expand Up @@ -704,6 +706,45 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Trigger garbage collection on each worker in the cluster.
void TriggerGlobalGC();

/// Report the task caller at caller_address that the intermediate
/// task return. It means if this API is used, the caller will be notified
/// the task return before the current task is terminated. The caller must
/// implement HandleReportIntermediateTaskReturn API endpoint
/// to handle the intermediate result report.
/// This API makes sense only for a generator task
/// (task that can return multiple intermediate
/// result before the task terminates).
///
/// NOTE: The API doesn't guarantee the ordering of the report. The
/// caller is supposed to reorder the report based on the idx.
///
/// \param[in] dynamic_return_object A intermediate ray object to report
/// to the caller before the task terminates. This object must have been
/// created dynamically from this worker via AllocateReturnObject.
/// \param[in] generator_id The return object ref ID from a current generator
/// task.
/// \param[in] caller_address The address of the caller of the current task
/// that created a generator_id.
/// \param[in] idx The index of the task return. It is used to reorder the
/// report from the caller side.
/// \param[in] finished True indicates there's going to be no more intermediate
/// task return. When finished is provided dynamic_return_object input will be
/// ignored.
Status ReportIntermediateTaskReturn(
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
const std::pair<ObjectID, std::shared_ptr<RayObject>> &dynamic_return_object,
const ObjectID &generator_id,
const rpc::Address &caller_address,
int64_t idx,
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
bool finished);

/// Implements gRPC server handler.
/// If an executor can generator task return before the task is finished,
/// it invokes this endpoint via ReportIntermediateTaskReturn RPC.
void HandleReportIntermediateTaskReturn(
rpc::ReportIntermediateTaskReturnRequest request,
rpc::ReportIntermediateTaskReturnReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Get a string describing object store memory usage for debugging purposes.
///
/// \return std::string The string describing memory usage.
Expand Down
45 changes: 40 additions & 5 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,36 @@ void ReferenceCounter::AddDynamicReturn(const ObjectID &object_id,
AddNestedObjectIdsInternal(generator_id, {object_id}, owner_address);
}

void ReferenceCounter::OwnDynamicStreamingTaskReturnRef(const ObjectID &object_id,
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
const ObjectID &generator_id) {
absl::MutexLock lock(&mutex_);
// NOTE: The upper layer (the layer that manges the object ref stream)
// should make sure the generator ref is not GC'ed when the
rkooo567 marked this conversation as resolved.
Show resolved Hide resolved
//
auto outer_it = object_id_refs_.find(generator_id);
if (outer_it == object_id_refs_.end()) {
// Generator object already went out of scope.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add warning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think we don't need a warning here. It can happen under normal circumstance (e.g., generator went out of scope before task finishes). I instead add a debug message!

// It means the generator is already GC'ed. No need to
// update the reference.
return;
}
RAY_LOG(DEBUG) << "Adding dynamic return " << object_id
<< " contained in generator object " << generator_id;
RAY_CHECK(outer_it->second.owned_by_us);
RAY_CHECK(outer_it->second.owner_address.has_value());
rpc::Address owner_address(outer_it->second.owner_address.value());
// We add a local reference here. The ref removal will be handled
// by the ObjectRefStream.
RAY_UNUSED(AddOwnedObjectInternal(object_id,
{},
owner_address,
outer_it->second.call_site,
/*object_size=*/-1,
outer_it->second.is_reconstructable,
/*add_local_ref=*/true,
absl::optional<NodeID>()));
}

bool ReferenceCounter::AddOwnedObjectInternal(
const ObjectID &object_id,
const std::vector<ObjectID> &inner_ids,
Expand Down Expand Up @@ -382,7 +412,7 @@ void ReferenceCounter::UpdateSubmittedTaskReferences(
std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, true);
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids_to_add) {
RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id;
Expand Down Expand Up @@ -411,7 +441,7 @@ void ReferenceCounter::UpdateResubmittedTaskReferences(
const std::vector<ObjectID> return_ids, const std::vector<ObjectID> &argument_ids) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, true);
UpdateObjectPendingCreationInternal(return_id, true);
}
for (const ObjectID &argument_id : argument_ids) {
auto it = object_id_refs_.find(argument_id);
Expand All @@ -433,7 +463,7 @@ void ReferenceCounter::UpdateFinishedTaskReferences(
std::vector<ObjectID> *deleted) {
absl::MutexLock lock(&mutex_);
for (const auto &return_id : return_ids) {
UpdateObjectPendingCreation(return_id, false);
UpdateObjectPendingCreationInternal(return_id, false);
}
// Must merge the borrower refs before decrementing any ref counts. This is
// to make sure that for serialized IDs, we increment the borrower count for
Expand Down Expand Up @@ -1278,8 +1308,8 @@ void ReferenceCounter::RemoveObjectLocationInternal(ReferenceTable::iterator it,
PushToLocationSubscribers(it);
}

void ReferenceCounter::UpdateObjectPendingCreation(const ObjectID &object_id,
bool pending_creation) {
void ReferenceCounter::UpdateObjectPendingCreationInternal(const ObjectID &object_id,
bool pending_creation) {
auto it = object_id_refs_.find(object_id);
bool push = false;
if (it != object_id_refs_.end()) {
Expand Down Expand Up @@ -1439,6 +1469,11 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id,
return it->second.is_reconstructable;
}

void ReferenceCounter::UpdateObjectReady(const ObjectID &object_id) {
absl::MutexLock lock(&mutex_);
UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ false);
}

bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const {
absl::MutexLock lock(&mutex_);
auto it = object_id_refs_.find(object_id);
Expand Down
Loading