From 0db82e31e249eac614f7c8e7da1c4f8f05c9064a Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Wed, 19 Jul 2023 10:16:45 -0700 Subject: [PATCH] [air] Fix behavior of multi-node checkpointing without an external `storage_path` to hard-fail (#37543) (#37567) Signed-off-by: Justin Yu --- python/ray/tune/execution/trial_runner.py | 11 +++++++---- python/ray/tune/syncer.py | 13 +++++++++++-- .../cloud_tests/workloads/run_cloud_test.py | 19 +++++++++---------- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 311af0603f314..e61d7c8008382 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -47,7 +47,7 @@ from ray.tune.schedulers import FIFOScheduler, TrialScheduler from ray.tune.stopper import NoopStopper, Stopper from ray.tune.search import BasicVariantGenerator, SearchAlgorithm -from ray.tune.syncer import SyncConfig +from ray.tune.syncer import _HeadNodeSyncDeprecationWarning, SyncConfig from ray.tune.experiment import Trial from ray.tune.utils import warn_if_slow, flatten_dict from ray.tune.utils.log import Verbosity, has_verbosity @@ -953,12 +953,15 @@ def _process_trial_save( self._checkpoint_manager.on_trial_checkpoint(trial) if trial.checkpoint.storage_mode != CheckpointStorage.MEMORY: self._mark_trial_to_checkpoint(trial) - except Exception: + except Exception as e: + if ( + isinstance(e, _HeadNodeSyncDeprecationWarning) + or self._fail_fast == TrialRunner.RAISE + ): + raise e logger.exception( "Trial %s: Error handling checkpoint %s", trial, checkpoint_value ) - if self._fail_fast == TrialRunner.RAISE: - raise trial.saving_to = None decision = self._cached_trial_decisions.pop(trial.trial_id, None) diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index 47f2d53b1b2f1..5477447348c67 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -78,6 +78,14 @@ f"./{LAZY_CHECKPOINT_MARKER_FILE}", ] + +class _HeadNodeSyncDeprecationWarning(DeprecationWarning): + """Error raised when trying to rely on deprecated head node syncing when + checkpointing across multiple nodes.""" + + pass + + _SYNC_TO_HEAD_DEPRECATION_MESSAGE = ( "Ray AIR no longer supports the synchronization of checkpoints and other " "artifacts from worker nodes to the head node. This means that the " @@ -92,7 +100,8 @@ "`RunConfig(storage_path='/mnt/path/to/nfs_storage')`\n" "See this Github issue for more details on transitioning to cloud storage/NFS " "as well as an explanation on why this functionality is " - "being removed: https://github.com/ray-project/ray/issues/37177\n\n" + "being removed: https://github.com/ray-project/ray/issues/37177\n" + "If you are already using NFS, you can ignore this warning message.\n\n" "Other temporary workarounds:\n" "- If you want to avoid errors/warnings and continue running with " "syncing explicitly turned off, set `RunConfig(SyncConfig(syncer=None))`\n" @@ -926,7 +935,7 @@ def on_checkpoint( # that means that it lives on some other node and would be synced to head # prior to Ray 2.6. if not os.path.exists(checkpoint.dir_or_data): - raise DeprecationWarning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) + raise _HeadNodeSyncDeprecationWarning(_SYNC_TO_HEAD_DEPRECATION_MESSAGE) # else: # No need to raise an error about syncing, since the driver can find # the checkpoint, because either: diff --git a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py index 8783f843bbfaa..0926ddd84201a 100644 --- a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py +++ b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py @@ -48,6 +48,7 @@ import ray.cloudpickle as pickle from ray import air, tune from ray.air import Checkpoint, session +from ray.tune import TuneError from ray.tune.execution.trial_runner import _find_newest_experiment_checkpoint from ray.tune.utils.serialization import TuneFunctionDecoder @@ -1035,17 +1036,18 @@ def test_head_node_syncing_disabled_error(): # Raise an error for checkpointing + no storage path def train_fn(config): + time.sleep(1) session.report({"score": 1}, checkpoint=Checkpoint.from_dict({"dummy": 1})) tuner = tune.Tuner( tune.with_resources(train_fn, {"CPU": 2.0}), - run_config=air.RunConfig( - storage_path=None, failure_config=air.FailureConfig(fail_fast="raise") - ), - tune_config=tune.TuneConfig(num_samples=4), + run_config=air.RunConfig(storage_path=None), + tune_config=tune.TuneConfig(num_samples=6), ) - with pytest.raises(DeprecationWarning): + with pytest.raises(TuneError) as error_context: tuner.fit() + # The original `_HeadNodeSyncDeprecationWarning` gets wrapped in 2 TuneError's + assert "_HeadNodeSyncDeprecationWarning" in str(error_context.value.__cause__) print("Success: checkpointing without a storage path raises an error") # Workaround: continue running, with syncing explicitly disabled @@ -1053,7 +1055,6 @@ def train_fn(config): tune.with_resources(train_fn, {"CPU": 2.0}), run_config=air.RunConfig( storage_path=None, - failure_config=air.FailureConfig(fail_fast="raise"), sync_config=tune.SyncConfig(syncer=None), ), tune_config=tune.TuneConfig(num_samples=4), @@ -1067,13 +1068,11 @@ def train_fn_no_checkpoint(config): tuner = tune.Tuner( tune.with_resources(train_fn_no_checkpoint, {"CPU": 2.0}), - run_config=air.RunConfig( - storage_path=None, failure_config=air.FailureConfig(fail_fast="raise") - ), + run_config=air.RunConfig(storage_path=None), tune_config=tune.TuneConfig(num_samples=4), ) tuner.fit() - print("Success: a multi-node experiment without checkpoint still runs") + print("Success: a multi-node experiment without checkpointing still runs") # TODO(ml-team): [Deprecation - head node syncing]