Skip to content

Commit

Permalink
[Streaming Generator] Fix a reference leak when a stream is deleted w…
Browse files Browse the repository at this point in the history
…ith out of order writes. (ray-project#35591)

Currently, when we delete the ObjectRefStream, we have the following logics.

Keeps reading the next references
If it reaches EoF, finish reading it.
If there's no next index, finish reading it.
Since all these unconsumed objects have a local reference we call RemoveLocalReferences to remove references.
This doesn't work well when the items are written in out of order, For example, see the following example.

Write (index 1) -> Delete -> Write (index 0). In this case, Delete thinks there's no references in the stream because it checks index 0 and found there's no references. It stops reading the stream. This works when the ordering of write is ensured, but not when it is not.

This PR fixes the issue by instead reading all unconsumed references via GetItemsUnconsumed API. This API will return every references that are not read yet.

When the references are written out of order,
  • Loading branch information
rkooo567 committed May 24, 2023
1 parent f6410b0 commit 6594e14
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 20 deletions.
38 changes: 18 additions & 20 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,22 @@ const int64_t kTaskFailureThrottlingThreshold = 50;
// Throttle task failure logs to once this interval.
const int64_t kTaskFailureLoggingFrequencyMillis = 5000;

std::vector<ObjectID> ObjectRefStream::GetItemsUnconsumed() const {
std::vector<ObjectID> result;
if (next_index_ == end_of_stream_index_) {
return {};
}

for (const auto &it : item_index_to_refs_) {
const auto &index = it.first;
const auto &object_id = it.second;
if (index >= next_index_) {
result.push_back(object_id);
}
}
return result;
}

Status ObjectRefStream::TryReadNextItem(ObjectID *object_id_out) {
bool is_eof_set = end_of_stream_index_ != -1;
if (is_eof_set && next_index_ >= end_of_stream_index_) {
Expand Down Expand Up @@ -388,25 +404,8 @@ void TaskManager::DelObjectRefStream(const ObjectID &generator_id) {
return;
}

while (true) {
ObjectID object_id;
const auto &status = TryReadObjectRefStreamInternal(generator_id, &object_id);

// keyError means the stream reaches to EoF.
if (status.IsObjectRefStreamEoF()) {
break;
}

if (object_id == ObjectID::Nil()) {
// No more objects to obtain. Stop iteration.
break;
} else {
// It means the object hasn't been consumed.
// We should remove references since we have 1 reference to this object.
object_ids_unconsumed.push_back(object_id);
}
}

const auto &stream = it->second;
object_ids_unconsumed = stream.GetItemsUnconsumed();
object_ref_streams_.erase(generator_id);
}

Expand Down Expand Up @@ -454,7 +453,6 @@ bool TaskManager::HandleReportGeneratorItemReturns(
absl::MutexLock lock(&mu_);
auto stream_it = object_ref_streams_.find(generator_id);
if (stream_it == object_ref_streams_.end()) {
// SANG-TODO add an unit test.
// Stream has been already deleted. Do not handle it.
return false;
}
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class ObjectRefStream {
/// \param[in] The last item index that means the end of stream.
void MarkEndOfStream(int64_t item_index);

/// Get all the ObjectIDs that are not read yet via TryReadNextItem.
///
/// \return A list of object IDs that are not read yet.
std::vector<ObjectID> GetItemsUnconsumed() const;

private:
const ObjectID generator_id_;

Expand Down
56 changes: 56 additions & 0 deletions src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,62 @@ TEST_F(TaskManagerTest, TestObjectRefStreamOutofOrder) {
manager_.DelObjectRefStream(generator_id);
}

TEST_F(TaskManagerTest, TestObjectRefStreamDelOutOfOrder) {
/**
* Verify there's no leak when we delete a ObjectRefStream
* that has out of order WRITEs.
* WRITE index 1 -> Del -> Write index 0. Both 0 and 1 have to be
* deleted.
*/
// Submit a generator task.
rpc::Address caller_address;
auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true);
auto generator_id = spec.ReturnId(0);
manager_.AddPendingTask(caller_address, spec, "", /*num_retries=*/0);
manager_.MarkDependenciesResolved(spec.TaskId());
manager_.MarkTaskWaitingForExecution(
spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom());

// CREATE
manager_.CreateObjectRefStream(generator_id);

// WRITE to index 1
auto dynamic_return_id_index_1 = ObjectID::FromIndex(spec.TaskId(), 3);
auto data = GenerateRandomBuffer();
auto req = GetIntermediateTaskReturn(
/*idx*/ 1,
/*finished*/ false,
generator_id,
/*dynamic_return_id*/ dynamic_return_id_index_1,
/*data*/ data,
/*set_in_plasma*/ false);
ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req));
ASSERT_TRUE(reference_counter_->HasReference(dynamic_return_id_index_1));

// Delete the stream. This should remove references from ^.
manager_.DelObjectRefStream(generator_id);
ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_1));

// WRITE to index 0. It should fail cuz the stream has been removed.
auto dynamic_return_id_index_0 = ObjectID::FromIndex(spec.TaskId(), 2);
data = GenerateRandomBuffer();
req = GetIntermediateTaskReturn(
/*idx*/ 0,
/*finished*/ false,
generator_id,
/*dynamic_return_id*/ dynamic_return_id_index_0,
/*data*/ data,
/*set_in_plasma*/ false);
ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns(req));
ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_0));

rpc::PushTaskReply reply;
manager_.CompletePendingTask(spec.TaskId(), reply, caller_address, false);

// There must be only a generator ID.
ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1);
}

} // namespace core
} // namespace ray

Expand Down

0 comments on commit 6594e14

Please sign in to comment.