Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-pick][Streaming Generator] Fix a reference leak when pinning requests are … #35794

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/ray/tests/test_streaming_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,32 @@ def get_data(self):

for ref in chain_actor_4.get_data.options(num_returns="streaming").remote():
assert np.array_equal(np.ones(5 * 1024 * 1024), ray.get(ref))
print("getting the next data")
del ref


def test_generator_slow_pinning_requests(monkeypatch, shutdown_only):
"""
Verify when the Object pinning request from the raylet
is reported slowly, there's no refernece leak.
"""
with monkeypatch.context() as m:
# defer for 10s for the second node.
m.setenv(
"RAY_testing_asio_delay_us",
"CoreWorkerService.grpc_server.PubsubLongPolling=1000000:1000000",
)

@ray.remote
def f():
yield np.ones(5 * 1024 * 1024)

for ref in f.options(num_returns="streaming").remote():
del ref

print(list_objects())


@pytest.mark.parametrize("store_in_plasma", [False, True])
def test_actor_streaming_generator(shutdown_only, store_in_plasma):
"""Test actor/async actor with sync/async generator interfaces."""
Expand Down
23 changes: 10 additions & 13 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3300,10 +3300,9 @@ void CoreWorker::ProcessSubscribeForObjectEviction(
const auto generator_id = ObjectID::FromBinary(message.generator_id());
RAY_CHECK(!generator_id.IsNil());
if (task_manager_->ObjectRefStreamExists(generator_id)) {
// It is possible this reference will leak if the ObjectRefStream is
// deleted or the corresponding object ID is not reported via
// HandleReportGeneratorItemReturns. TODO(sang): Handle the edge case.
reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
// ObjectRefStreamExists is used to distinguigsh num_returns="dynamic" vs
// "streaming".
task_manager_->TemporarilyOwnGeneratorReturnRefIfNeeded(object_id, generator_id);
} else {
reference_counter_->AddDynamicReturn(object_id, generator_id);
}
Expand Down Expand Up @@ -3439,12 +3438,10 @@ void CoreWorker::AddSpilledObjectLocationOwner(
// object is spilled before the reply from the task that created the
// object. Add the dynamically created object to our ref counter so that we
// know that it exists.
RAY_CHECK(!generator_id->IsNil());
if (task_manager_->ObjectRefStreamExists(*generator_id)) {
// It is possible this reference will leak if the ObjectRefStream is
// deleted or the corresponding object ID is not reported via
// HandleReportGeneratorItemReturns. TODO(sang): Handle the edge case.
reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, *generator_id);
// ObjectRefStreamExists is used to distinguigsh num_returns="dynamic" vs
// "streaming".
task_manager_->TemporarilyOwnGeneratorReturnRefIfNeeded(object_id, *generator_id);
} else {
reference_counter_->AddDynamicReturn(object_id, *generator_id);
}
Expand Down Expand Up @@ -3476,10 +3473,10 @@ void CoreWorker::AddObjectLocationOwner(const ObjectID &object_id,
const auto &maybe_generator_id = task_manager_->TaskGeneratorId(object_id.TaskId());
if (!maybe_generator_id.IsNil()) {
if (task_manager_->ObjectRefStreamExists(maybe_generator_id)) {
// It is possible this reference will leak if the ObjectRefStream is
// deleted or the corresponding object ID is not reported via
// HandleReportGeneratorItemReturns. TODO(sang): Handle the edge case.
reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, maybe_generator_id);
// ObjectRefStreamExists is used to distinguigsh num_returns="dynamic" vs
// "streaming".
task_manager_->TemporarilyOwnGeneratorReturnRefIfNeeded(object_id,
maybe_generator_id);
} else {
// The task is a generator and may not have finished yet. Add the internal
// ObjectID so that we can update its location.
Expand Down
56 changes: 55 additions & 1 deletion src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ std::vector<ObjectID> ObjectRefStream::GetItemsUnconsumed() const {
result.push_back(object_id);
}
}
// Temporarily owned refs are not consumed.
for (const auto &object_id : temporarily_owned_refs_) {
result.push_back(object_id);
}
return result;
}

Expand Down Expand Up @@ -80,6 +84,28 @@ Status ObjectRefStream::TryReadNextItem(ObjectID *object_id_out) {
return Status::OK();
}

bool ObjectRefStream::TemporarilyInsertToStreamIfNeeded(const ObjectID &object_id) {
// Write to a stream if the object ID is not consumed yet.
auto last_consumed_index = next_index_ - 1;
if (item_index_to_refs_.find(last_consumed_index) != item_index_to_refs_.end()) {
// Object ID from the generator task always increment. I.e., the first
// return has a lower ObjectID bytes than the second return.
// If the last conusumed object ID's index is lower than a given ObjectID,
// it means the given ref is not consumed yet, meaning we should
// write to a stream.
auto not_consumed_yet =
item_index_to_refs_[last_consumed_index].ObjectIndex() < object_id.ObjectIndex();
return not_consumed_yet;
}

if (refs_written_to_stream_.find(object_id) == refs_written_to_stream_.end()) {
temporarily_owned_refs_.insert(object_id);
return true;
}

return false;
}

bool ObjectRefStream::InsertToStream(const ObjectID &object_id, int64_t item_index) {
if (end_of_stream_index_ != -1) {
RAY_CHECK(next_index_ <= end_of_stream_index_);
Expand All @@ -90,6 +116,11 @@ bool ObjectRefStream::InsertToStream(const ObjectID &object_id, int64_t item_ind
return false;
}

if (temporarily_owned_refs_.find(object_id) != temporarily_owned_refs_.end()) {
temporarily_owned_refs_.erase(object_id);
}
refs_written_to_stream_.insert(object_id);

auto it = item_index_to_refs_.find(item_index);
if (it != item_index_to_refs_.end()) {
// It means the when a task is retried it returns a different object id
Expand Down Expand Up @@ -413,7 +444,7 @@ void TaskManager::DelObjectRefStream(const ObjectID &generator_id) {
for (const auto &object_id : object_ids_unconsumed) {
std::vector<ObjectID> deleted;
reference_counter_->RemoveLocalReference(object_id, &deleted);
RAY_CHECK_EQ(deleted.size(), 1UL);
RAY_CHECK_GE(deleted.size(), 1UL);
}
}

Expand Down Expand Up @@ -511,6 +542,29 @@ bool TaskManager::HandleReportGeneratorItemReturns(
return num_objects_written != 0;
}

bool TaskManager::TemporarilyOwnGeneratorReturnRefIfNeeded(const ObjectID &object_id,
const ObjectID &generator_id) {
bool inserted_to_stream = false;
{
absl::MutexLock lock(&mu_);
auto stream_it = object_ref_streams_.find(generator_id);
if (stream_it == object_ref_streams_.end()) {
return false;
}

auto &stream = stream_it->second;
inserted_to_stream = stream.TemporarilyInsertToStreamIfNeeded(object_id);
}

// We shouldn't hold a lock when calling refernece counter API.
if (inserted_to_stream) {
reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
return true;
}

return false;
}

void TaskManager::CompletePendingTask(const TaskID &task_id,
const rpc::PushTaskReply &reply,
const rpc::Address &worker_addr,
Expand Down
59 changes: 56 additions & 3 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,27 @@ class ObjectRefStream {
///
/// \param[in] object_id The object id that will be read at index item_index.
/// \param[in] item_index The index where the object id will be written.
/// \return True if the idx hasn't been used. False otherwise.
/// If -1 is given, it means an index is not known yet. In this case,
/// the ref will be temporarily written until it is written with an index.
/// \return True if the ref is written to a stream. False otherwise.
bool InsertToStream(const ObjectID &object_id, int64_t item_index);

/// Mark the stream canont be used anymore.
/// Sometimes, index of the object ID is not known.
///
/// In this case, we should temporarily write the object ref to the
/// stream until it is written with an index.
///
/// In the following scenario, the API will be no-op because
/// it means the object ID was already written with an index.
/// - If the object ID is already consumed.
/// - If the object ID is already written with an index.
///
/// \param[in] object_id The temporarily written object id.
/// \return True if object ID is temporarily written. False otherwise.
bool TemporarilyInsertToStreamIfNeeded(const ObjectID &object_id);

/// Mark that after a given item_index, the stream cannot be written
/// anymore.
///
/// \param[in] The last item index that means the end of stream.
void MarkEndOfStream(int64_t item_index);
Expand All @@ -130,6 +147,11 @@ class ObjectRefStream {

/// The item_index -> object reference ids.
absl::flat_hash_map<int64_t, ObjectID> item_index_to_refs_;
/// Refs that are temporarily owned. It means a ref is
/// written to a stream, but index is not known yet.
absl::flat_hash_set<ObjectID> temporarily_owned_refs_;
// A set of refs that's already written to a stream.
absl::flat_hash_set<ObjectID> refs_written_to_stream_;
/// The last index of the stream.
/// item_index < last will contain object references.
/// If -1, that means the stream hasn't reached to EoF.
Expand Down Expand Up @@ -219,12 +241,43 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
const rpc::Address &worker_addr,
bool is_application_error) override;

/// Handle the task return reported before the task terminates.
/// Handle the generator task return so that it will be accessible
/// via TryReadObjectRefStream.
///
/// Generator tasks can report task returns before task is finished.
/// It is the opposite of regular tasks which can only batch
/// report the task returns after the task finishes.
///
/// \return True if a task return is registered. False otherwise.
bool HandleReportGeneratorItemReturns(
const rpc::ReportGeneratorItemReturnsRequest &request);

/// Temporarily register a given generator return reference.
///
/// For a generator return, the references are not known until
/// it is reported from an executor (via HandleReportGeneratorItemReturns).
/// However, there are times when generator return references need to be
/// owned before the return values are reported.
///
/// For example, when an object is created or spilled from the object store,
/// pinning or OBOD update requests could be sent from raylets,
/// and it is possible those requests come before generator returns
/// are reported. In this case, we should own a reference temporarily,
/// otherwise, these requests will be ignored.
///
/// In the following scenario, references don't need to be owned. In this case,
/// the API will be no-op.
/// - The stream has been already deleted.
/// - The reference is already read/consumed from a stream.
/// In this case, we already owned or GC'ed the refernece.
/// - The reference is already owned via HandleReportGeneratorItemReturns.
///
/// \param object_id The object ID to temporarily owns.
/// \param generator_id The return ref ID of a generator task.
/// \return True if we temporarily owned the reference. False otherwise.
bool TemporarilyOwnGeneratorReturnRefIfNeeded(const ObjectID &object_id,
const ObjectID &generator_id);

/// Delete the object ref stream.
///
/// Once the stream is deleted, it will clean up all unconsumed
Expand Down
94 changes: 94 additions & 0 deletions src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,100 @@ TEST_F(TaskManagerTest, TestObjectRefStreamDelOutOfOrder) {
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1);
}

TEST_F(TaskManagerTest, TestObjectRefStreamTemporarilyOwnGeneratorReturnRefIfNeeded) {
/**
* Test TemporarilyOwnGeneratorReturnRefIfNeeded
*/
// Submit a generator task.
rpc::Address caller_address;
auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true);
auto generator_id = spec.ReturnId(0);
manager_.AddPendingTask(caller_address, spec, "", /*num_retries=*/0);
manager_.MarkDependenciesResolved(spec.TaskId());
manager_.MarkTaskWaitingForExecution(
spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom());

/**
* Test TemporarilyOwnGeneratorReturnRefIfNeeded is no-op when the stream is
* not created yet.
*/
auto dynamic_return_id_index_0 = ObjectID::FromIndex(spec.TaskId(), 2);
manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_0,
generator_id);
// It is no-op if the object ref stream is not created.
ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_0));

/**
* Test TemporarilyOwnGeneratorReturnRefIfNeeded called before any
* HandleReportGeneratorItemReturns adds a refernece.
*/
// CREATE
manager_.CreateObjectRefStream(generator_id);
manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_0,
generator_id);
// We has a reference to this object before the ref is
// reported via HandleReportGeneratorItemReturns.
ASSERT_TRUE(reference_counter_->HasReference(dynamic_return_id_index_0));

/**
* Test TemporarilyOwnGeneratorReturnRefIfNeeded called after the
* ref consumed / removed will be no-op.
*/
// WRITE 0 -> WRITE 1
auto data = GenerateRandomBuffer();
auto req = GetIntermediateTaskReturn(
/*idx*/ 0,
/*finished*/ false,
generator_id,
/*dynamic_return_id*/ dynamic_return_id_index_0,
/*data*/ data,
/*set_in_plasma*/ false);
ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req));
auto dynamic_return_id_index_1 = ObjectID::FromIndex(spec.TaskId(), 3);
data = GenerateRandomBuffer();
req = GetIntermediateTaskReturn(
/*idx*/ 1,
/*finished*/ false,
generator_id,
/*dynamic_return_id*/ dynamic_return_id_index_1,
/*data*/ data,
/*set_in_plasma*/ false);
ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req));

// READ 0 -> READ 1
for (auto i = 0; i < 2; i++) {
ObjectID object_id;
auto status = manager_.TryReadObjectRefStream(generator_id, &object_id);
ASSERT_TRUE(status.ok());
}

std::vector<ObjectID> removed;
reference_counter_->RemoveLocalReference(dynamic_return_id_index_1, &removed);
ASSERT_EQ(removed.size(), 1UL);
ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_1));
// If the ref has been already consumed and deleted,
// this shouldn't add a reference.
manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_1,
generator_id);
ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_1));

/**
* Test TemporarilyOwnGeneratorReturnRefIfNeeded called but
* HandleReportGeneratorItemReturns is never called. In this case, when
* the stream is deleted these refs should be cleaned up.
*/
manager_.DelObjectRefStream(generator_id);
manager_.CreateObjectRefStream(generator_id);
manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_0,
generator_id);
ASSERT_TRUE(reference_counter_->HasReference(dynamic_return_id_index_0));
manager_.DelObjectRefStream(generator_id);
ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_0));

rpc::PushTaskReply reply;
manager_.CompletePendingTask(spec.TaskId(), reply, caller_address, false);
}

} // namespace core
} // namespace ray

Expand Down