diff --git a/python/ray/tests/test_reconstruction_2.py b/python/ray/tests/test_reconstruction_2.py index bc38d844991df..f1673a55fa7ae 100644 --- a/python/ray/tests/test_reconstruction_2.py +++ b/python/ray/tests/test_reconstruction_2.py @@ -10,10 +10,10 @@ from ray._private.internal_api import memory_summary from ray._private.test_utils import Semaphore, SignalActor, wait_for_condition import ray.exceptions +from ray.util.state import list_tasks # Task status. WAITING_FOR_DEPENDENCIES = "PENDING_ARGS_AVAIL" -SCHEDULED = "PENDING_NODE_ASSIGNMENT" FINISHED = "FINISHED" WAITING_FOR_EXECUTION = "SUBMITTED_TO_WORKER" @@ -505,6 +505,57 @@ def dependent_task(x): ray.get(obj) +def test_object_reconstruction_dead_actor(config, ray_start_cluster): + # Test to make sure that if object reconstruction fails + # due to dead actor, pending_creation is set back to false. + # https://github.com/ray-project/ray/issues/47606 + cluster = ray_start_cluster + cluster.add_node(num_cpus=0, _system_config=config) + ray.init(address=cluster.address) + node1 = cluster.add_node(resources={"node1": 1}) + node2 = cluster.add_node(resources={"node2": 1}) + + @ray.remote(max_restarts=0, max_task_retries=-1, resources={"node1": 0.1}) + class Worker: + def func_in(self): + return np.random.rand(1024000) + + @ray.remote(max_retries=-1, resources={"node2": 0.1}) + def func_out(data): + return np.random.rand(1024000) + + worker = Worker.remote() + + ref_in = worker.func_in.remote() + ref_out = func_out.remote(ref_in) + + ray.wait([ref_in, ref_out], num_returns=2, timeout=None, fetch_local=False) + + def func_out_resubmitted(): + tasks = list_tasks(filters=[("name", "=", "func_out")]) + assert len(tasks) == 2 + assert ( + tasks[0]["state"] == "PENDING_NODE_ASSIGNMENT" + or tasks[1]["state"] == "PENDING_NODE_ASSIGNMENT" + ) + return True + + cluster.remove_node(node2, allow_graceful=False) + # ref_out will reconstruct, wait for the lease request to reach raylet. + wait_for_condition(func_out_resubmitted) + + cluster.remove_node(node1, allow_graceful=False) + # ref_in is lost and the reconstruction will + # fail with ActorDiedError + + node1 = cluster.add_node(resources={"node1": 1}) + node2 = cluster.add_node(resources={"node2": 1}) + + with pytest.raises(ray.exceptions.RayTaskError) as exc_info: + ray.get(ref_out) + assert "input arguments for this task could not be computed" in str(exc_info.value) + + def test_object_reconstruction_pending_creation(config, ray_start_cluster): # Test to make sure that an object being reconstructured # has pending_creation set to true. diff --git a/src/ray/core_worker/object_recovery_manager.cc b/src/ray/core_worker/object_recovery_manager.cc index 239ef51d59dd3..09b1ab7954afc 100644 --- a/src/ray/core_worker/object_recovery_manager.cc +++ b/src/ray/core_worker/object_recovery_manager.cc @@ -168,10 +168,15 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { // object. const auto task_id = object_id.TaskId(); std::vector task_deps; + // pending_creation needs to be set to true BEFORE calling ResubmitTask, + // since it might be set back to false inside ResubmitTask if the task is + // an actor task and the actor is dead. If we set pending_creation to true + // after ResubmitTask, then it will remain true forever. + // see https://github.com/ray-project/ray/issues/47606 for more details. + reference_counter_->UpdateObjectPendingCreation(object_id, true); auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps); if (resubmitted) { - reference_counter_->UpdateObjectPendingCreation(object_id, true); // Try to recover the task's dependencies. for (const auto &dep : task_deps) { auto recovered = RecoverObject(dep); @@ -189,6 +194,7 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) { } else { RAY_LOG(INFO) << "Failed to reconstruct object " << object_id << " because lineage has already been deleted"; + reference_counter_->UpdateObjectPendingCreation(object_id, false); recovery_failure_callback_( object_id, rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED,