Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Minor update on task manager #49272

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,8 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
return PushError(job_id, type, error_message, timestamp);
};
task_manager_.reset(new TaskManager(
memory_store_,
reference_counter_,
*memory_store_,
*reference_counter_,
/*put_in_local_plasma_callback=*/
[this](const RayObject &object, const ObjectID &object_id) {
RAY_CHECK_OK(PutInLocalPlasmaStore(object, object_id, /*pin_object=*/true));
Expand Down
145 changes: 75 additions & 70 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,13 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
// object is considered in scope before we return the ObjectRef to the
// language frontend. Note that the language bindings should set
// skip_adding_local_ref=True to avoid double referencing the object.
reference_counter_->AddOwnedObject(return_id,
/*contained_ids=*/{},
caller_address,
call_site,
-1,
is_reconstructable,
/*add_local_ref=*/true);
reference_counter_.AddOwnedObject(return_id,
/*contained_ids=*/{},
caller_address,
call_site,
-1,
is_reconstructable,
/*add_local_ref=*/true);
}

return_ids.push_back(return_id);
Expand All @@ -269,7 +269,7 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
returned_refs.push_back(std::move(ref));
}

reference_counter_->UpdateSubmittedTaskReferences(return_ids, task_deps);
reference_counter_.UpdateSubmittedTaskReferences(return_ids, task_deps);

// If it is a generator task, create an object ref stream.
// The language frontend is responsible for calling DeleteObjectRefStream.
Expand Down Expand Up @@ -304,6 +304,8 @@ std::vector<rpc::ObjectReference> TaskManager::AddPendingTask(
}

bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) {
RAY_CHECK(task_deps->empty());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an assertion before emplace.


TaskSpecification spec;
bool resubmit = false;
{
Expand Down Expand Up @@ -335,45 +337,48 @@ bool TaskManager::ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *tas
}
}

if (resubmit) {
for (size_t i = 0; i < spec.NumArgs(); i++) {
if (spec.ArgByRef(i)) {
task_deps->push_back(spec.ArgId(i));
} else {
const auto &inlined_refs = spec.ArgInlinedRefs(i);
for (const auto &inlined_ref : inlined_refs) {
task_deps->push_back(ObjectID::FromBinary(inlined_ref.object_id()));
}
}
}
if (!resubmit) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main change here, follows best practice to early return.

return true;
}

reference_counter_->UpdateResubmittedTaskReferences(*task_deps);

for (const auto &task_dep : *task_deps) {
bool was_freed = reference_counter_->TryMarkFreedObjectInUseAgain(task_dep);
if (was_freed) {
RAY_LOG(DEBUG) << "Dependency " << task_dep << " of task " << task_id
<< " was freed";
// We do not keep around copies for objects that were freed, but now that
// they're needed for recovery, we need to generate and pin a new copy.
// Delete the old in-memory marker that indicated that the object was
// freed. Now workers that attempt to get the object will be able to get
// the reconstructed value.
in_memory_store_->Delete({task_dep});
task_deps->reserve(spec.NumArgs());
for (size_t i = 0; i < spec.NumArgs(); i++) {
if (spec.ArgByRef(i)) {
task_deps->emplace_back(spec.ArgId(i));
} else {
const auto &inlined_refs = spec.ArgInlinedRefs(i);
for (const auto &inlined_ref : inlined_refs) {
task_deps->emplace_back(ObjectID::FromBinary(inlined_ref.object_id()));
}
}
if (spec.IsActorTask()) {
const auto actor_creation_return_id = spec.ActorCreationDummyObjectId();
reference_counter_->UpdateResubmittedTaskReferences({actor_creation_return_id});
}
}

reference_counter_.UpdateResubmittedTaskReferences(*task_deps);

RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object, attempt #"
<< spec.AttemptNumber() << ": " << spec.DebugString();
// We should actually detect if the actor for this task is dead, but let's just assume
// it's not for now.
retry_task_callback_(
spec, /*object_recovery*/ true, /*update_seqno=*/true, /*delay_ms*/ 0);
for (const auto &task_dep : *task_deps) {
bool was_freed = reference_counter_.TryMarkFreedObjectInUseAgain(task_dep);
if (was_freed) {
RAY_LOG(DEBUG) << "Dependency " << task_dep << " of task " << task_id
<< " was freed";
// We do not keep around copies for objects that were freed, but now that
// they're needed for recovery, we need to generate and pin a new copy.
// Delete the old in-memory marker that indicated that the object was
// freed. Now workers that attempt to get the object will be able to get
// the reconstructed value.
in_memory_store_.Delete({task_dep});
}
}
if (spec.IsActorTask()) {
const auto actor_creation_return_id = spec.ActorCreationDummyObjectId();
reference_counter_.UpdateResubmittedTaskReferences({actor_creation_return_id});
}

RAY_LOG(INFO) << "Resubmitting task that produced lost plasma object, attempt #"
<< spec.AttemptNumber() << ": " << spec.DebugString();
// We should actually detect if the actor for this task is dead, but let's just assume
// it's not for now.
retry_task_callback_(
spec, /*object_recovery*/ true, /*update_seqno=*/true, /*delay_ms*/ 0);

return true;
}
Expand Down Expand Up @@ -435,7 +440,7 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id,
const NodeID &worker_raylet_id,
bool store_in_plasma) {
bool direct_return = false;
reference_counter_->UpdateObjectSize(object_id, return_object.size());
reference_counter_.UpdateObjectSize(object_id, return_object.size());
RAY_LOG(DEBUG) << "Task return object " << object_id << " has size "
<< return_object.size();
const auto nested_refs =
Expand All @@ -445,10 +450,10 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id,
// NOTE(swang): We need to add the location of the object before marking
// it as local in the in-memory store so that the data locality policy
// will choose the right raylet for any queued dependent tasks.
reference_counter_->UpdateObjectPinnedAtRaylet(object_id, worker_raylet_id);
reference_counter_.UpdateObjectPinnedAtRaylet(object_id, worker_raylet_id);
// Mark it as in plasma with a dummy object.
RAY_CHECK(
in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
} else {
// NOTE(swang): If a direct object was promoted to plasma, then we do not
// record the node ID that it was pinned at, which means that we will not
Expand All @@ -474,18 +479,18 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id,
if (store_in_plasma) {
put_in_local_plasma_callback_(object, object_id);
} else {
direct_return = in_memory_store_->Put(object, object_id);
direct_return = in_memory_store_.Put(object, object_id);
}
}

rpc::Address owner_address;
if (reference_counter_->GetOwner(object_id, &owner_address) && !nested_refs.empty()) {
if (reference_counter_.GetOwner(object_id, &owner_address) && !nested_refs.empty()) {
std::vector<ObjectID> nested_ids;
nested_ids.reserve(nested_refs.size());
for (const auto &nested_ref : nested_refs) {
nested_ids.emplace_back(ObjectRefToId(nested_ref));
}
reference_counter_->AddNestedObjectIds(object_id, nested_ids, owner_address);
reference_counter_.AddNestedObjectIds(object_id, nested_ids, owner_address);
}
return direct_return;
}
Expand Down Expand Up @@ -582,8 +587,8 @@ bool TaskManager::TryDelObjectRefStreamInternal(const ObjectID &generator_id) {
// Remove any unconsumed refs from the stream metadata in-memory store.
auto unconsumed_ids = stream_it->second.PopUnconsumedItems();
std::vector<ObjectID> deleted;
reference_counter_->TryReleaseLocalRefs(unconsumed_ids, &deleted);
in_memory_store_->Delete(deleted);
reference_counter_.TryReleaseLocalRefs(unconsumed_ids, &deleted);
in_memory_store_.Delete(deleted);

int64_t num_objects_generated = stream_it->second.EofIndex();
if (num_objects_generated == -1) {
Expand All @@ -593,7 +598,7 @@ bool TaskManager::TryDelObjectRefStreamInternal(const ObjectID &generator_id) {
return false;
}

bool can_gc_lineage = reference_counter_->CheckGeneratorRefsLineageOutOfScope(
bool can_gc_lineage = reference_counter_.CheckGeneratorRefsLineageOutOfScope(
generator_id, num_objects_generated);
return can_gc_lineage;
}
Expand Down Expand Up @@ -637,12 +642,12 @@ void TaskManager::MarkEndOfStream(const ObjectID &generator_id,
<< stream_it->second.EofIndex()
<< ". Last object id: " << last_object_id;

reference_counter_->OwnDynamicStreamingTaskReturnRef(last_object_id, generator_id);
reference_counter_.OwnDynamicStreamingTaskReturnRef(last_object_id, generator_id);
RayObject error(rpc::ErrorType::END_OF_STREAMING_GENERATOR);
// Put a dummy object at the end of the stream. We don't need to check if
// the object should be stored in plasma because the end of the stream is a
// fake ObjectRef that should never be read by the application.
in_memory_store_->Put(error, last_object_id);
in_memory_store_.Put(error, last_object_id);
}
}

Expand Down Expand Up @@ -700,11 +705,11 @@ bool TaskManager::HandleReportGeneratorItemReturns(
// own the dynamically generated task return.
// NOTE: If we call this method while holding a lock, it can deadlock.
if (index_not_used_yet) {
reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
reference_counter_.OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
num_objects_written += 1;
}
// When an object is reported, the object is ready to be fetched.
reference_counter_->UpdateObjectPendingCreation(object_id, false);
reference_counter_.UpdateObjectPendingCreation(object_id, false);
HandleTaskReturn(object_id,
return_object,
NodeID::FromBinary(request.worker_addr().raylet_id()),
Expand Down Expand Up @@ -763,7 +768,7 @@ bool TaskManager::TemporarilyOwnGeneratorReturnRefIfNeededInternal(
// We shouldn't hold a lock when calling refernece counter API.
if (inserted_to_stream) {
RAY_LOG(DEBUG) << "Added streaming ref " << object_id;
reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
reference_counter_.OwnDynamicStreamingTaskReturnRef(object_id, generator_id);
return true;
}

Expand All @@ -789,7 +794,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
for (const auto &return_object : reply.dynamic_return_objects()) {
const auto object_id = ObjectID::FromBinary(return_object.object_id());
if (first_execution) {
reference_counter_->AddDynamicReturn(object_id, generator_id);
reference_counter_.AddDynamicReturn(object_id, generator_id);
dynamic_return_ids.push_back(object_id);
}
if (!HandleTaskReturn(object_id,
Expand Down Expand Up @@ -943,7 +948,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
RemoveFinishedTaskReferences(spec, release_lineage, worker_addr, reply.borrowed_refs());
if (min_lineage_bytes_to_evict > 0) {
// Evict at least half of the current lineage.
auto bytes_evicted = reference_counter_->EvictLineage(min_lineage_bytes_to_evict);
auto bytes_evicted = reference_counter_.EvictLineage(min_lineage_bytes_to_evict);
RAY_LOG(INFO) << "Evicted " << bytes_evicted / 1e6 << "MB of task lineage.";
}

Expand Down Expand Up @@ -1147,12 +1152,12 @@ void TaskManager::OnTaskDependenciesInlined(
const std::vector<ObjectID> &inlined_dependency_ids,
const std::vector<ObjectID> &contained_ids) {
std::vector<ObjectID> deleted;
reference_counter_->UpdateSubmittedTaskReferences(
reference_counter_.UpdateSubmittedTaskReferences(
/*return_ids=*/{},
/*argument_ids_to_add=*/contained_ids,
/*argument_ids_to_remove=*/inlined_dependency_ids,
&deleted);
in_memory_store_->Delete(deleted);
in_memory_store_.Delete(deleted);
}

void TaskManager::RemoveFinishedTaskReferences(
Expand Down Expand Up @@ -1195,13 +1200,13 @@ void TaskManager::RemoveFinishedTaskReferences(
}

std::vector<ObjectID> deleted;
reference_counter_->UpdateFinishedTaskReferences(return_ids,
plasma_dependencies,
release_lineage,
borrower_addr,
borrowed_refs,
&deleted);
in_memory_store_->Delete(deleted);
reference_counter_.UpdateFinishedTaskReferences(return_ids,
plasma_dependencies,
release_lineage,
borrower_addr,
borrowed_refs,
&deleted);
in_memory_store_.Delete(deleted);
}

int64_t TaskManager::RemoveLineageReference(const ObjectID &object_id,
Expand Down Expand Up @@ -1312,15 +1317,15 @@ void TaskManager::MarkTaskReturnObjectsFailed(
if (store_in_plasma_ids.contains(object_id)) {
put_in_local_plasma_callback_(error, object_id);
} else {
in_memory_store_->Put(error, object_id);
in_memory_store_.Put(error, object_id);
}
}
if (spec.ReturnsDynamic()) {
for (const auto &dynamic_return_id : spec.DynamicReturnIds()) {
if (store_in_plasma_ids.contains(dynamic_return_id)) {
put_in_local_plasma_callback_(error, dynamic_return_id);
} else {
in_memory_store_->Put(error, dynamic_return_id);
in_memory_store_.Put(error, dynamic_return_id);
}
}
}
Expand All @@ -1345,7 +1350,7 @@ void TaskManager::MarkTaskReturnObjectsFailed(
if (store_in_plasma_ids.contains(generator_return_id)) {
put_in_local_plasma_callback_(error, generator_return_id);
} else {
in_memory_store_->Put(error, generator_return_id);
in_memory_store_.Put(error, generator_return_id);
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ class ObjectRefStream {

class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface {
public:
TaskManager(std::shared_ptr<CoreWorkerMemoryStore> in_memory_store,
std::shared_ptr<ReferenceCounter> reference_counter,
TaskManager(CoreWorkerMemoryStore &in_memory_store,
ReferenceCounter &reference_counter,
PutInLocalPlasmaCallback put_in_local_plasma_callback,
RetryTaskCallback retry_task_callback,
PushErrorCallback push_error_callback,
Expand All @@ -235,7 +235,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
{"IsRetry", std::get<2>(key) ? "1" : "0"},
{"Source", "owner"}});
});
reference_counter_->SetReleaseLineageCallback(
reference_counter_.SetReleaseLineageCallback(
[this](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
return RemoveLineageReference(object_id, ids_to_release);
ShutdownIfNeeded();
Expand Down Expand Up @@ -268,8 +268,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
/// responsible for making sure that these dependencies become available, so
/// that the resubmitted task can run. This is only populated if the task was
/// not already pending and was successfully resubmitted.
/// \return OK if the task was successfully resubmitted or was
/// already pending, Invalid if the task spec is no longer present.
/// \return true if the task was successfully resubmitted (task or actor being
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jjyao I actually have a question on how we retry and retrieve retry result, task manager only submit task, but not blocking wait their completion; task recovery manager also doesn't block wait. Is there any concern?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. It doesn't block, it's async operation.

/// scheduled, but no guarantee on completion), or was already pending, Invalid if the
/// task spec is no longer present.
bool ResubmitTask(const TaskID &task_id, std::vector<ObjectID> *task_deps) override;

/// Wait for all pending tasks to finish, and then shutdown.
Expand Down Expand Up @@ -817,12 +818,12 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
ABSL_EXCLUSIVE_LOCKS_REQUIRED(object_ref_stream_ops_mu_) ABSL_LOCKS_EXCLUDED(mu_);

/// Used to store task results.
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store_;
CoreWorkerMemoryStore &in_memory_store_;

/// Used for reference counting objects.
/// The task manager is responsible for managing all references related to
/// submitted tasks (dependencies and return objects).
std::shared_ptr<ReferenceCounter> reference_counter_;
ReferenceCounter &reference_counter_;

/// Mapping from a streaming generator task id -> object ref stream.
absl::flat_hash_map<ObjectID, ObjectRefStream> object_ref_streams_
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/test/task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class TaskManagerTest : public ::testing::Test {
store_(std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore(
io_context_.GetIoService(), reference_counter_.get()))),
manager_(
store_,
reference_counter_,
*store_,
*reference_counter_,
[this](const RayObject &object, const ObjectID &object_id) {
stored_in_plasma.insert(object_id);
},
Expand Down