From 04296c820c3db1a188b97fb6c2ff097beb5892b0 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 12 Sep 2024 22:24:30 -0700 Subject: [PATCH 1/5] Fix object reconstruction hang on arguments pending creation Signed-off-by: Jiajun Yao --- .../transport/actor_task_submitter.cc | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index babd1ba8dc6db..4f99010a68c56 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -227,30 +227,35 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { }, "ActorTaskSubmitter::SubmitTask"); } else { - // Do not hold the lock while calling into task_finisher_. - task_finisher_.MarkTaskCanceled(task_id); - rpc::ErrorType error_type; - rpc::RayErrorInfo error_info; - { - absl::MutexLock lock(&mu_); - const auto queue_it = client_queues_.find(task_spec.ActorId()); - const auto &death_cause = queue_it->second.death_cause; - error_info = GetErrorInfoFromActorDeathCause(death_cause); - error_type = error_info.error_type(); - } - auto status = Status::IOError("cancelling task of dead actor"); - // No need to increment the number of completed tasks since the actor is - // dead. - bool fail_immediately = - error_info.has_actor_died_error() && - error_info.actor_died_error().has_oom_context() && - error_info.actor_died_error().oom_context().fail_immediately(); - GetTaskFinisherWithoutMu().FailOrRetryPendingTask(task_id, - error_type, - &status, - &error_info, - /*mark_task_object_failed*/ true, - fail_immediately); + io_service_.post( + [this, task_spec, task_id]() { + // Do not hold the lock while calling into task_finisher_. + task_finisher_.MarkTaskCanceled(task_id); + rpc::ErrorType error_type; + rpc::RayErrorInfo error_info; + { + absl::MutexLock lock(&mu_); + const auto queue_it = client_queues_.find(task_spec.ActorId()); + const auto &death_cause = queue_it->second.death_cause; + error_info = GetErrorInfoFromActorDeathCause(death_cause); + error_type = error_info.error_type(); + } + auto status = Status::IOError("cancelling task of dead actor"); + // No need to increment the number of completed tasks since the actor is + // dead. + bool fail_immediately = + error_info.has_actor_died_error() && + error_info.actor_died_error().has_oom_context() && + error_info.actor_died_error().oom_context().fail_immediately(); + GetTaskFinisherWithoutMu().FailOrRetryPendingTask( + task_id, + error_type, + &status, + &error_info, + /*mark_task_object_failed*/ true, + fail_immediately); + }, + "ActorTaskSubmitter::SubmitTask"); } // If the task submission subsequently fails, then the client will receive From 5a0211a7bb79cdbaa7b487239ee29de6cecfbdf6 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 25 Sep 2024 09:36:51 -0700 Subject: [PATCH 2/5] fix Signed-off-by: Jiajun Yao --- src/ray/core_worker/test/direct_actor_transport_test.cc | 2 +- src/ray/core_worker/transport/actor_task_submitter.cc | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 2c73107acae6a..1da502e394367 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -626,7 +626,7 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderGcs) { task = CreateActorTaskHelper(actor_id, worker_id, 4); EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _, _)) .Times(1); - ASSERT_FALSE(CheckSubmitTask(task)); + ASSERT_TRUE(CheckSubmitTask(task)); } TEST_P(ActorTaskSubmitterTest, TestActorRestartFailInflightTasks) { diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index 4f99010a68c56..e56bc30033ea0 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -227,6 +227,9 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { }, "ActorTaskSubmitter::SubmitTask"); } else { + // Post to the event loop to maintain the async nature of + // SubmitTask and avoid issues like + // https://github.com/ray-project/ray/issues/47606. io_service_.post( [this, task_spec, task_id]() { // Do not hold the lock while calling into task_finisher_. From eb7625ff1e3cacae2350ad273c1789d1059d64e2 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 25 Sep 2024 11:32:12 -0700 Subject: [PATCH 3/5] test Signed-off-by: Jiajun Yao --- python/ray/tests/test_reconstruction_2.py | 53 ++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/test_reconstruction_2.py b/python/ray/tests/test_reconstruction_2.py index 2db52ca328802..8261b0edaa7ad 100644 --- a/python/ray/tests/test_reconstruction_2.py +++ b/python/ray/tests/test_reconstruction_2.py @@ -10,10 +10,10 @@ from ray._private.internal_api import memory_summary from ray._private.test_utils import Semaphore, SignalActor, wait_for_condition import ray.exceptions +from ray.util.state import list_tasks # Task status. WAITING_FOR_DEPENDENCIES = "PENDING_ARGS_AVAIL" -SCHEDULED = "PENDING_NODE_ASSIGNMENT" FINISHED = "FINISHED" WAITING_FOR_EXECUTION = "SUBMITTED_TO_WORKER" @@ -499,6 +499,57 @@ def dependent_task(x): ray.get(obj) +def test_object_reconstruction_dead_actor(config, ray_start_cluster): + # Test to make sure that if object reconstruction fails + # due to dead actor, pending_creation is set back to false. + # https://github.com/ray-project/ray/issues/47606 + cluster = ray_start_cluster + cluster.add_node(num_cpus=0, _system_config=config) + ray.init(address=cluster.address) + node1 = cluster.add_node(resources={"node1": 1}) + node2 = cluster.add_node(resources={"node2": 1}) + + @ray.remote(max_restarts=0, max_task_retries=-1, resources={"node1": 0.1}) + class Worker: + def func_in(self): + return np.random.rand(1024000) + + @ray.remote(max_retries=-1, resources={"node2": 0.1}) + def func_out(data): + return np.random.rand(1024000) + + worker = Worker.remote() + + ref_in = worker.func_in.remote() + ref_out = func_out.remote(ref_in) + + ray.wait([ref_in, ref_out], num_returns=2, timeout=None, fetch_local=False) + + def func_out_resubmitted(): + tasks = list_tasks(filters=[("name", "=", "func_out")]) + assert len(tasks) == 2 + assert ( + tasks[0]["state"] == "PENDING_NODE_ASSIGNMENT" + or tasks[1]["state"] == "PENDING_NODE_ASSIGNMENT" + ) + return True + + cluster.remove_node(node2, allow_graceful=False) + # ref_out will reconstruct, wait for the lease request to reach raylet. + wait_for_condition(func_out_resubmitted) + + cluster.remove_node(node1, allow_graceful=False) + # ref_in is lost and the reconstruction will + # fail with ActorDiedError + + node1 = cluster.add_node(resources={"node1": 1}) + node2 = cluster.add_node(resources={"node2": 1}) + + with pytest.raises(ray.exceptions.RayTaskError) as exc_info: + ray.get(ref_out) + assert "input arguments for this task could not be computed" in str(exc_info.value) + + def test_object_reconstruction_pending_creation(config, ray_start_cluster): # Test to make sure that an object being reconstructured # has pending_creation set to true. From f69141985e2dd9abcacb756dea125c0fed83fa4f Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 26 Sep 2024 22:17:44 -0700 Subject: [PATCH 4/5] up Signed-off-by: Jiajun Yao --- .../core_worker/object_recovery_manager.cc | 3 +- .../test/direct_actor_transport_test.cc | 2 +- .../transport/actor_task_submitter.cc | 56 ++++++++----------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 239ef51d59dd3..840a57eed5bc9 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -168,10 +168,10 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { // object. const auto task_id = object_id.TaskId(); std::vector task_deps; + reference_counter_->UpdateObjectPendingCreation(object_id, true); auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps); if (resubmitted) { - reference_counter_->UpdateObjectPendingCreation(object_id, true); // Try to recover the task's dependencies. for (const auto &dep : task_deps) { auto recovered = RecoverObject(dep); @@ -189,6 +189,7 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { } else { RAY_LOG(INFO) << "Failed to reconstruct object " << object_id << " because lineage has already been deleted"; + reference_counter_->UpdateObjectPendingCreation(object_id, false); recovery_failure_callback_( object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED, diff --git a/src/ray/core_worker/test/direct_actor_transport_test.cc b/src/ray/core_worker/test/direct_actor_transport_test.cc index 1da502e394367..2c73107acae6a 100644 --- a/src/ray/core_worker/test/direct_actor_transport_test.cc +++ b/src/ray/core_worker/test/direct_actor_transport_test.cc @@ -626,7 +626,7 @@ TEST_P(ActorTaskSubmitterTest, TestActorRestartOutOfOrderGcs) { task = CreateActorTaskHelper(actor_id, worker_id, 4); EXPECT_CALL(*task_finisher_, FailOrRetryPendingTask(task.TaskId(), _, _, _, _, _)) .Times(1); - ASSERT_TRUE(CheckSubmitTask(task)); + ASSERT_FALSE(CheckSubmitTask(task)); } TEST_P(ActorTaskSubmitterTest, TestActorRestartFailInflightTasks) { diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index e56bc30033ea0..babd1ba8dc6db 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -227,38 +227,30 @@ Status ActorTaskSubmitter::SubmitTask(TaskSpecification task_spec) { }, "ActorTaskSubmitter::SubmitTask"); } else { - // Post to the event loop to maintain the async nature of - // SubmitTask and avoid issues like - // https://github.com/ray-project/ray/issues/47606. - io_service_.post( - [this, task_spec, task_id]() { - // Do not hold the lock while calling into task_finisher_. - task_finisher_.MarkTaskCanceled(task_id); - rpc::ErrorType error_type; - rpc::RayErrorInfo error_info; - { - absl::MutexLock lock(&mu_); - const auto queue_it = client_queues_.find(task_spec.ActorId()); - const auto &death_cause = queue_it->second.death_cause; - error_info = GetErrorInfoFromActorDeathCause(death_cause); - error_type = error_info.error_type(); - } - auto status = Status::IOError("cancelling task of dead actor"); - // No need to increment the number of completed tasks since the actor is - // dead. - bool fail_immediately = - error_info.has_actor_died_error() && - error_info.actor_died_error().has_oom_context() && - error_info.actor_died_error().oom_context().fail_immediately(); - GetTaskFinisherWithoutMu().FailOrRetryPendingTask( - task_id, - error_type, - &status, - &error_info, - /*mark_task_object_failed*/ true, - fail_immediately); - }, - "ActorTaskSubmitter::SubmitTask"); + // Do not hold the lock while calling into task_finisher_. + task_finisher_.MarkTaskCanceled(task_id); + rpc::ErrorType error_type; + rpc::RayErrorInfo error_info; + { + absl::MutexLock lock(&mu_); + const auto queue_it = client_queues_.find(task_spec.ActorId()); + const auto &death_cause = queue_it->second.death_cause; + error_info = GetErrorInfoFromActorDeathCause(death_cause); + error_type = error_info.error_type(); + } + auto status = Status::IOError("cancelling task of dead actor"); + // No need to increment the number of completed tasks since the actor is + // dead. + bool fail_immediately = + error_info.has_actor_died_error() && + error_info.actor_died_error().has_oom_context() && + error_info.actor_died_error().oom_context().fail_immediately(); + GetTaskFinisherWithoutMu().FailOrRetryPendingTask(task_id, + error_type, + &status, + &error_info, + /*mark_task_object_failed*/ true, + fail_immediately); } // If the task submission subsequently fails, then the client will receive From 52b17d2486ec569a84c4d0f63dc8445d0578d6a6 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 27 Sep 2024 09:03:55 -0700 Subject: [PATCH 5/5] comment Signed-off-by: Jiajun Yao --- src/ray/core_worker/object_recovery_manager.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 840a57eed5bc9..09b1ab7954afc 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -168,6 +168,11 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { // object. const auto task_id = object_id.TaskId(); std::vector task_deps; + // pending_creation needs to be set to true BEFORE calling ResubmitTask, + // since it might be set back to false inside ResubmitTask if the task is + // an actor task and the actor is dead. If we set pending_creation to true + // after ResubmitTask, then it will remain true forever. + // see https://github.com/ray-project/ray/issues/47606 for more details. reference_counter_->UpdateObjectPendingCreation(object_id, true); auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps);