From c3901c76cfd6771e444bbf2db7d7feefc749f483 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 29 Aug 2022 10:30:24 -0700 Subject: [PATCH 01/11] Add timeout to retry fn Signed-off-by: Kai Fricke --- python/ray/tune/tests/test_utils.py | 110 ++++++++++++++++++------- python/ray/tune/trainable/trainable.py | 11 +++ python/ray/tune/utils/util.py | 34 +++++--- 3 files changed, 113 insertions(+), 42 deletions(-) diff --git a/python/ray/tune/tests/test_utils.py b/python/ray/tune/tests/test_utils.py index bca055f4c8c74..d426aca232d8b 100644 --- a/python/ray/tune/tests/test_utils.py +++ b/python/ray/tune/tests/test_utils.py @@ -1,45 +1,91 @@ -import unittest +import pytest from ray.tune.search.variant_generator import format_vars +from ray.tune.utils.util import retry_fn -class TuneUtilsTest(unittest.TestCase): - def testFormatVars(self): - # Format brackets correctly - self.assertTrue( - format_vars( - { - ("a", "b", "c"): 8.1234567, - ("a", "b", "d"): [7, 8], - ("a", "b", "e"): [[[3, 4]]], - } - ), - "c=8.12345,d=7_8,e=3_4", +def test_format_vars(): + + # Format brackets correctly + assert ( + format_vars( + { + ("a", "b", "c"): 8.1234567, + ("a", "b", "d"): [7, 8], + ("a", "b", "e"): [[[3, 4]]], + } ) - # Sorted by full keys, but only last key is reported - self.assertTrue( - format_vars( - { - ("a", "c", "x"): [7, 8], - ("a", "b", "x"): 8.1234567, - } - ), - "x=8.12345,x=7_8", + == "c=8.12345,d=7_8,e=3_4" + ) + # Sorted by full keys, but only last key is reported + assert ( + format_vars( + { + ("a", "c", "x"): [7, 8], + ("a", "b", "x"): 8.1234567, + } ) - # Filter out invalid chars. It's ok to have empty keys or values. - self.assertTrue( - format_vars( - { - ("a c?x"): " <;%$ok ", - ("some"): " ", - } - ), - "a_c_x=ok,some=", + == "x=8.12345,x=7_8" + ) + # Filter out invalid chars. It's ok to have empty keys or values. + assert ( + format_vars( + { + ("a c?x"): " <;%$ok ", + ("some"): " ", + } ) + == "a_c_x=ok,some=" + ) + + +def test_retry_fn_repeat(tmpdir): + success = tmpdir / "success" + marker = tmpdir / "marker" + + def _fail_once(): + if marker.exists(): + success.write_text(".", encoding="utf-8") + return + marker.write_text(".", encoding="utf-8") + raise RuntimeError("Failing") + + assert not success.exists() + assert not marker.exists() + + assert retry_fn( + fn=_fail_once, + exception_type=RuntimeError, + sleep_time=0, + ) + + assert success.exists() + assert marker.exists() + + +def test_retry_fn_timeout(tmpdir): + success = tmpdir / "success" + marker = tmpdir / "marker" + + def _fail_once(): + if marker.exists(): + success.write_text(".", encoding="utf-8") + return + marker.write_text(".", encoding="utf-8") + raise RuntimeError("Failing") + + assert not success.exists() + assert not marker.exists() + + assert not retry_fn( + fn=_fail_once, exception_type=RuntimeError, sleep_time=5, timeout=0.1 + ) + + assert not success.exists() + assert marker.exists() if __name__ == "__main__": - import pytest import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index a63b767c585c0..b6e2dabc966b6 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -61,6 +61,14 @@ SETUP_TIME_THRESHOLD = 10 +def _sync_timeout() -> Optional[float]: + sync_timeout = float(os.environ.get("TUNE_SYNC_TIMEOUT", "600")) + if sync_timeout == 0: + return None + + return sync_timeout + + @PublicAPI class Trainable: """Abstract class for trainable models, functions, etc. @@ -517,6 +525,7 @@ def _maybe_save_to_cloud(self, checkpoint_dir: str) -> bool: subprocess.CalledProcessError, num_retries=3, sleep_time=1, + timeout=_sync_timeout(), ) return True @@ -551,6 +560,7 @@ def _maybe_load_from_cloud(self, checkpoint_path: str) -> bool: subprocess.CalledProcessError, num_retries=3, sleep_time=1, + timeout=_sync_timeout(), ) return True @@ -724,6 +734,7 @@ def delete_checkpoint(self, checkpoint_path: Union[str, Checkpoint]): subprocess.CalledProcessError, num_retries=3, sleep_time=1, + timeout=_sync_timeout(), ) if os.path.exists(checkpoint_dir): diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 6deaa3a57f757..23e3225881b67 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -2,11 +2,13 @@ import glob import inspect import logging +import multiprocessing import os import threading import time from collections import defaultdict from datetime import datetime +from numbers import Number from threading import Thread from typing import Dict, List, Union, Type, Callable, Any, Optional @@ -124,18 +126,30 @@ def stop(self): @DeveloperAPI def retry_fn( fn: Callable[[], Any], - exception_type: Type[Exception], + exception_type: Type[Exception] = Exception, num_retries: int = 3, sleep_time: int = 1, -): - for i in range(num_retries): - try: - fn() - except exception_type as e: - logger.warning(e) - time.sleep(sleep_time) - else: - break + timeout: Optional[Number] = None, +) -> bool: + def _retry_fn(): + for i in range(num_retries): + try: + fn() + except exception_type as e: + logger.warning(e) + time.sleep(sleep_time) + else: + return + + proc = multiprocessing.Process(target=_retry_fn) + proc.start() + proc.join(timeout=timeout) + + if proc.exitcode is None: + proc.terminate() + return False + + return proc.exitcode == 0 @ray.remote From 8d542149f05ddeb826d4056bba0ddb98cfa06280 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 29 Aug 2022 17:01:37 -0700 Subject: [PATCH 02/11] fix format_vars test Signed-off-by: Kai Fricke --- python/ray/tune/tests/test_utils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/tune/tests/test_utils.py b/python/ray/tune/tests/test_utils.py index d426aca232d8b..bb1f6e26505b6 100644 --- a/python/ray/tune/tests/test_utils.py +++ b/python/ray/tune/tests/test_utils.py @@ -15,7 +15,7 @@ def test_format_vars(): ("a", "b", "e"): [[[3, 4]]], } ) - == "c=8.12345,d=7_8,e=3_4" + == "c=8.1235,d=7_8,e=3_4" ) # Sorted by full keys, but only last key is reported assert ( @@ -25,14 +25,14 @@ def test_format_vars(): ("a", "b", "x"): 8.1234567, } ) - == "x=8.12345,x=7_8" + == "x=8.1235,x=7_8" ) # Filter out invalid chars. It's ok to have empty keys or values. assert ( format_vars( { - ("a c?x"): " <;%$ok ", - ("some"): " ", + ("a c?x",): " <;%$ok ", + ("some",): " ", } ) == "a_c_x=ok,some=" From 3a3e22ad4e628847385d7b454a2578cbc9d51701 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 29 Aug 2022 17:13:52 -0700 Subject: [PATCH 03/11] pass to trainable as arg Signed-off-by: Kai Fricke --- .../ray/tune/execution/ray_trial_executor.py | 15 +++++++++++---- python/ray/tune/execution/trial_runner.py | 6 +++++- python/ray/tune/syncer.py | 6 ++++++ .../ray/tune/tests/test_ray_trial_executor.py | 2 +- python/ray/tune/trainable/trainable.py | 19 +++++++------------ 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/python/ray/tune/execution/ray_trial_executor.py b/python/ray/tune/execution/ray_trial_executor.py index fe6f886e304ad..a0eec9aeee7a0 100644 --- a/python/ray/tune/execution/ray_trial_executor.py +++ b/python/ray/tune/execution/ray_trial_executor.py @@ -217,7 +217,7 @@ def __init__( self._has_cleaned_up_pgs = False self._reuse_actors = reuse_actors - # The maxlen will be updated when `set_max_pending_trials()` is called + # The maxlen will be updated when `setup(max_pending_trials)` is called self._cached_actor_pg = deque(maxlen=1) self._pg_manager = _PlacementGroupManager(prefix=_get_tune_pg_prefix()) self._staged_trials = set() @@ -235,16 +235,20 @@ def __init__( self._buffer_max_time_s = float( os.getenv("TUNE_RESULT_BUFFER_MAX_TIME_S", 100.0) ) + self._trainable_kwargs = {} - def set_max_pending_trials(self, max_pending: int) -> None: + def setup( + self, max_pending_trials: int, trainable_kwargs: Optional[Dict] = None + ) -> None: if len(self._cached_actor_pg) > 0: logger.warning( "Cannot update maximum number of queued actors for reuse " "during a run." ) else: - self._cached_actor_pg = deque(maxlen=max_pending) - self._pg_manager.set_max_staging(max_pending) + self._cached_actor_pg = deque(maxlen=max_pending_trials) + self._pg_manager.set_max_staging(max_pending_trials) + self._trainable_kwargs = trainable_kwargs or {} def set_status(self, trial: Trial, status: str) -> None: """Sets status and checkpoints metadata if needed. @@ -377,6 +381,9 @@ def _setup_remote_runner(self, trial): kwargs["remote_checkpoint_dir"] = trial.remote_checkpoint_dir kwargs["custom_syncer"] = trial.custom_syncer + if self._trainable_kwargs: + kwargs.update(self._trainable_kwargs) + # Throw a meaningful error if trainable does not use the # new API sig = inspect.signature(trial.get_trainable_cls()) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index d810729f747d8..043aa6470660a 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -198,6 +198,7 @@ def _serialize_and_write(): exclude = ["*/checkpoint_*"] if self._syncer: + # Todo: Implement sync_timeout if force: # Wait until previous sync command finished self._syncer.wait() @@ -341,7 +342,10 @@ def __init__( else: # Manual override self._max_pending_trials = int(max_pending_trials) - self.trial_executor.set_max_pending_trials(self._max_pending_trials) + self.trial_executor.setup( + max_pending_trials=self._max_pending_trials, + trainable_kwargs={"sync_timeout": sync_config.sync_timeout}, + ) self._metric = metric diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index a5ade393834bd..75f5c2e9e770f 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -40,6 +40,9 @@ # Syncing period for syncing checkpoints between nodes or to cloud. DEFAULT_SYNC_PERIOD = 300 +# Default sync timeout after which syncing processes are aborted +DEFAULT_SYNC_TIMEOUT = 3600 + _EXCLUDE_FROM_SYNC = [ "./checkpoint_-00001", "./checkpoint_tmp*", @@ -85,6 +88,8 @@ class SyncConfig: is asynchronous and best-effort. This does not affect persistent storage syncing. Defaults to True. sync_period: Syncing period for syncing between nodes. + sync_timeout: Timeout after which running sync processes are aborted. + Currently only affects trial-to-cloud syncing. """ @@ -93,6 +98,7 @@ class SyncConfig: sync_on_checkpoint: bool = True sync_period: int = DEFAULT_SYNC_PERIOD + sync_timeout: int = DEFAULT_SYNC_TIMEOUT def _repr_html_(self) -> str: """Generate an HTML representation of the SyncConfig. diff --git a/python/ray/tune/tests/test_ray_trial_executor.py b/python/ray/tune/tests/test_ray_trial_executor.py index 4615f605b2568..8e18c5476fbe9 100644 --- a/python/ray/tune/tests/test_ray_trial_executor.py +++ b/python/ray/tune/tests/test_ray_trial_executor.py @@ -499,7 +499,7 @@ def testHasResourcesForTrialWithCaching(self): executor = RayTrialExecutor(reuse_actors=True) executor._pg_manager = pgm - executor.set_max_pending_trials(1) + executor.setup(max_pending_trials=1) def train(config): yield 1 diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index b6e2dabc966b6..29e26242a0979 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -61,14 +61,6 @@ SETUP_TIME_THRESHOLD = 10 -def _sync_timeout() -> Optional[float]: - sync_timeout = float(os.environ.get("TUNE_SYNC_TIMEOUT", "600")) - if sync_timeout == 0: - return None - - return sync_timeout - - @PublicAPI class Trainable: """Abstract class for trainable models, functions, etc. @@ -109,8 +101,9 @@ def __init__( logger_creator: Callable[[Dict[str, Any]], "Logger"] = None, remote_checkpoint_dir: Optional[str] = None, custom_syncer: Optional[Syncer] = None, + sync_timeout: Optional[int] = None, ): - """Initialize an Trainable. + """Initialize a Trainable. Sets up logging and points ``self.logdir`` to a directory in which training outputs should be placed. @@ -128,6 +121,7 @@ def __init__( which is different from **per checkpoint** directory. custom_syncer: Syncer used for synchronizing data from Ray nodes to external storage. + sync_timeout: Timeout after which sync processes are aborted. """ self._experiment_id = uuid.uuid4().hex @@ -179,6 +173,7 @@ def __init__( self.remote_checkpoint_dir = remote_checkpoint_dir self.custom_syncer = custom_syncer + self.sync_timeout = sync_timeout @property def uses_cloud_checkpointing(self): @@ -525,7 +520,7 @@ def _maybe_save_to_cloud(self, checkpoint_dir: str) -> bool: subprocess.CalledProcessError, num_retries=3, sleep_time=1, - timeout=_sync_timeout(), + timeout=self.sync_timeout, ) return True @@ -560,7 +555,7 @@ def _maybe_load_from_cloud(self, checkpoint_path: str) -> bool: subprocess.CalledProcessError, num_retries=3, sleep_time=1, - timeout=_sync_timeout(), + timeout=self.sync_timeout, ) return True @@ -734,7 +729,7 @@ def delete_checkpoint(self, checkpoint_path: Union[str, Checkpoint]): subprocess.CalledProcessError, num_retries=3, sleep_time=1, - timeout=_sync_timeout(), + timeout=self.sync_timeout, ) if os.path.exists(checkpoint_dir): From 9f62b7919d8033f023ec097d4c9dcf0a47fb09e4 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 29 Aug 2022 17:18:04 -0700 Subject: [PATCH 04/11] error message Signed-off-by: Kai Fricke --- python/ray/tune/trainable/trainable.py | 27 +++++++++++++++++++------- python/ray/tune/utils/util.py | 1 + 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index 29e26242a0979..363d9cdc2fc90 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -515,13 +515,18 @@ def _maybe_save_to_cloud(self, checkpoint_dir: str) -> bool: return True checkpoint = Checkpoint.from_directory(checkpoint_dir) - retry_fn( - lambda: checkpoint.to_uri(self._storage_path(checkpoint_dir)), + checkpoint_uri = self._storage_path(checkpoint_dir) + if not retry_fn( + lambda: checkpoint.to_uri(checkpoint_uri), subprocess.CalledProcessError, num_retries=3, sleep_time=1, timeout=self.sync_timeout, - ) + ): + logger.error( + f"Could not upload checkpoint even after 3 retries: " + f"{checkpoint_uri}" + ) return True def _maybe_load_from_cloud(self, checkpoint_path: str) -> bool: @@ -550,13 +555,17 @@ def _maybe_load_from_cloud(self, checkpoint_path: str) -> bool: return True checkpoint = Checkpoint.from_uri(external_uri) - retry_fn( + if not retry_fn( lambda: checkpoint.to_directory(local_dir), subprocess.CalledProcessError, num_retries=3, sleep_time=1, timeout=self.sync_timeout, - ) + ): + logger.error( + f"Could not download checkpoint even after 3 retries: " + f"{external_uri}" + ) return True @@ -724,13 +733,17 @@ def delete_checkpoint(self, checkpoint_path: Union[str, Checkpoint]): self.custom_syncer.wait_or_retry() else: checkpoint_uri = self._storage_path(checkpoint_dir) - retry_fn( + if not retry_fn( lambda: _delete_external_checkpoint(checkpoint_uri), subprocess.CalledProcessError, num_retries=3, sleep_time=1, timeout=self.sync_timeout, - ) + ): + logger.error( + f"Could not delete checkpoint even after 3 retries: " + f"{checkpoint_uri}" + ) if os.path.exists(checkpoint_dir): shutil.rmtree(checkpoint_dir) diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index 23e3225881b67..c2d98c067e934 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -147,6 +147,7 @@ def _retry_fn(): if proc.exitcode is None: proc.terminate() + logger.debug(f"Process timed out: {getattr(fn, '__name__', None)}") return False return proc.exitcode == 0 From 042771bb4d34d59c7690af92833295be30565fe0 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Mon, 29 Aug 2022 17:39:26 -0700 Subject: [PATCH 05/11] Use threads instead of multiprocessing, add end to end test Signed-off-by: Kai Fricke --- python/ray/tune/tests/test_trainable.py | 36 ++++++++++++++++++++++++- python/ray/tune/utils/util.py | 9 +++---- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/python/ray/tune/tests/test_trainable.py b/python/ray/tune/tests/test_trainable.py index 248152285f6a9..90bdf0451f044 100644 --- a/python/ray/tune/tests/test_trainable.py +++ b/python/ray/tune/tests/test_trainable.py @@ -1,14 +1,16 @@ import json import os import tempfile +import time from typing import Dict, Union +from unittest.mock import patch import pytest import ray from ray import tune from ray.air import session, Checkpoint -from ray.air._internal.remote_storage import download_from_uri +from ray.air._internal.remote_storage import download_from_uri, upload_to_uri from ray.tune.trainable import wrap_function @@ -188,6 +190,38 @@ def test_checkpoint_object_no_sync(tmpdir): trainable.restore_from_object(obj) +@pytest.mark.parametrize("hanging", [True, False]) +def test_sync_timeout(tmpdir, hanging): + orig_upload_fn = upload_to_uri + + def _hanging_upload(*args, **kwargs): + time.sleep(200 if hanging else 0) + orig_upload_fn(*args, **kwargs) + + trainable = SavingTrainable( + "object", remote_checkpoint_dir="memory:///test/location", sync_timeout=1 + ) + + with patch("ray.air.checkpoint.upload_to_uri", _hanging_upload): + trainable.save() + + check_dir = tmpdir / "check_save_obj" + + try: + download_from_uri(uri="memory:///test/location", local_path=str(check_dir)) + except FileNotFoundError: + hung = True + else: + hung = False + + assert hung == hanging + + if hanging: + assert not check_dir.exists() + else: + assert check_dir.listdir() + + if __name__ == "__main__": import sys diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index c2d98c067e934..b0af91580cf8d 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -2,7 +2,6 @@ import glob import inspect import logging -import multiprocessing import os import threading import time @@ -141,16 +140,16 @@ def _retry_fn(): else: return - proc = multiprocessing.Process(target=_retry_fn) + proc = threading.Thread(target=_retry_fn) + proc.daemon = True proc.start() proc.join(timeout=timeout) - if proc.exitcode is None: - proc.terminate() + if proc.is_alive(): logger.debug(f"Process timed out: {getattr(fn, '__name__', None)}") return False - return proc.exitcode == 0 + return True @ray.remote From 13c6cb3c59f3dbc18ec2c939f3591e17706ec391 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 30 Aug 2022 07:37:46 -0700 Subject: [PATCH 06/11] fix sync config init Signed-off-by: Kai Fricke --- python/ray/tune/execution/trial_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 043aa6470660a..89d2641a7d5bc 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -342,6 +342,9 @@ def __init__( else: # Manual override self._max_pending_trials = int(max_pending_trials) + + sync_config = sync_config or SyncConfig() + self.trial_executor.setup( max_pending_trials=self._max_pending_trials, trainable_kwargs={"sync_timeout": sync_config.sync_timeout}, @@ -389,7 +392,6 @@ def __init__( if self._local_checkpoint_dir: os.makedirs(self._local_checkpoint_dir, exist_ok=True) - sync_config = sync_config or SyncConfig() self._remote_checkpoint_dir = remote_checkpoint_dir self._syncer = get_node_to_storage_syncer(sync_config) From 0affba255cf7dd24daa8dbfb79d6c833ed6345fe Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 30 Aug 2022 11:29:34 -0700 Subject: [PATCH 07/11] Update comment Signed-off-by: Kai Fricke --- python/ray/tune/execution/trial_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index 89d2641a7d5bc..a4fd37b00208d 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -198,7 +198,8 @@ def _serialize_and_write(): exclude = ["*/checkpoint_*"] if self._syncer: - # Todo: Implement sync_timeout + # Todo: Implement sync_timeout for experiment-level syncing + # (it is currently only used for trainable-to-cloud syncing) if force: # Wait until previous sync command finished self._syncer.wait() From 8553d11c2e011ca9a06d56ccd56f2f99e9745246 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Tue, 30 Aug 2022 15:01:08 -0700 Subject: [PATCH 08/11] Sync timeout per retry Signed-off-by: Kai Fricke --- python/ray/tune/syncer.py | 2 +- python/ray/tune/tests/test_trainable.py | 2 +- python/ray/tune/tests/test_utils.py | 15 ++++---- python/ray/tune/utils/util.py | 46 +++++++++++++++---------- 4 files changed, 39 insertions(+), 26 deletions(-) diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index 75f5c2e9e770f..3a18bdd34b3e1 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -41,7 +41,7 @@ DEFAULT_SYNC_PERIOD = 300 # Default sync timeout after which syncing processes are aborted -DEFAULT_SYNC_TIMEOUT = 3600 +DEFAULT_SYNC_TIMEOUT = 1800 _EXCLUDE_FROM_SYNC = [ "./checkpoint_-00001", diff --git a/python/ray/tune/tests/test_trainable.py b/python/ray/tune/tests/test_trainable.py index 90bdf0451f044..86d03b0e0d2ac 100644 --- a/python/ray/tune/tests/test_trainable.py +++ b/python/ray/tune/tests/test_trainable.py @@ -199,7 +199,7 @@ def _hanging_upload(*args, **kwargs): orig_upload_fn(*args, **kwargs) trainable = SavingTrainable( - "object", remote_checkpoint_dir="memory:///test/location", sync_timeout=1 + "object", remote_checkpoint_dir="memory:///test/location", sync_timeout=0.1 ) with patch("ray.air.checkpoint.upload_to_uri", _hanging_upload): diff --git a/python/ray/tune/tests/test_utils.py b/python/ray/tune/tests/test_utils.py index bb1f6e26505b6..21b99c6873216 100644 --- a/python/ray/tune/tests/test_utils.py +++ b/python/ray/tune/tests/test_utils.py @@ -1,3 +1,5 @@ +import time + import pytest from ray.tune.search.variant_generator import format_vars @@ -68,17 +70,18 @@ def test_retry_fn_timeout(tmpdir): marker = tmpdir / "marker" def _fail_once(): - if marker.exists(): - success.write_text(".", encoding="utf-8") - return - marker.write_text(".", encoding="utf-8") - raise RuntimeError("Failing") + if not marker.exists(): + marker.write_text(".", encoding="utf-8") + raise RuntimeError("Failing") + time.sleep(5) + success.write_text(".", encoding="utf-8") + return assert not success.exists() assert not marker.exists() assert not retry_fn( - fn=_fail_once, exception_type=RuntimeError, sleep_time=5, timeout=0.1 + fn=_fail_once, exception_type=RuntimeError, sleep_time=0, timeout=0.1 ) assert not success.exists() diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index b0af91580cf8d..e8108d2521717 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -130,26 +130,36 @@ def retry_fn( sleep_time: int = 1, timeout: Optional[Number] = None, ) -> bool: + errored = threading.Event() + def _retry_fn(): - for i in range(num_retries): - try: - fn() - except exception_type as e: - logger.warning(e) - time.sleep(sleep_time) - else: - return - - proc = threading.Thread(target=_retry_fn) - proc.daemon = True - proc.start() - proc.join(timeout=timeout) - - if proc.is_alive(): - logger.debug(f"Process timed out: {getattr(fn, '__name__', None)}") - return False + try: + fn() + except exception_type as e: + logger.warning(e) + errored.set() + + for i in range(num_retries): + errored.clear() + + proc = threading.Thread(target=_retry_fn) + proc.daemon = True + proc.start() + proc.join(timeout=timeout) + + if proc.is_alive(): + logger.debug( + f"Process timed out (try {i+1}/{num_retries}): " + f"{getattr(fn, '__name__', None)}" + ) + elif not errored.is_set(): + return True - return True + # Timed out, sleep and try again + time.sleep(sleep_time) + + # Timed out, so return False + return False @ray.remote From 46d3b871d1f7d23189120decc33eeb6552e9212e Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Wed, 31 Aug 2022 11:53:19 -0700 Subject: [PATCH 09/11] Better error message, rename fn Signed-off-by: Kai Fricke --- python/ray/tune/trainable/trainable.py | 6 +++++- python/ray/tune/utils/util.py | 4 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/python/ray/tune/trainable/trainable.py b/python/ray/tune/trainable/trainable.py index 363d9cdc2fc90..aedf56ba4acb3 100644 --- a/python/ray/tune/trainable/trainable.py +++ b/python/ray/tune/trainable/trainable.py @@ -524,7 +524,11 @@ def _maybe_save_to_cloud(self, checkpoint_dir: str) -> bool: timeout=self.sync_timeout, ): logger.error( - f"Could not upload checkpoint even after 3 retries: " + f"Could not upload checkpoint even after 3 retries." + f"Please check if the credentials expired and that the remote " + f"filesystem is supported.. For large checkpoints, consider " + f"increasing `SyncConfig(sync_timeout)` " + f"(current value: {self.sync_timeout} seconds). Checkpoint URI: " f"{checkpoint_uri}" ) return True diff --git a/python/ray/tune/utils/util.py b/python/ray/tune/utils/util.py index e8108d2521717..66b581b339fa8 100644 --- a/python/ray/tune/utils/util.py +++ b/python/ray/tune/utils/util.py @@ -132,7 +132,7 @@ def retry_fn( ) -> bool: errored = threading.Event() - def _retry_fn(): + def _try_fn(): try: fn() except exception_type as e: @@ -142,7 +142,7 @@ def _retry_fn(): for i in range(num_retries): errored.clear() - proc = threading.Thread(target=_retry_fn) + proc = threading.Thread(target=_try_fn) proc.daemon = True proc.start() proc.join(timeout=timeout) From ef5e35b77f736e6e13ae06382141ab76f83c129c Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Wed, 31 Aug 2022 18:56:27 -0700 Subject: [PATCH 10/11] Increase timeout Signed-off-by: Kai Fricke --- python/ray/tune/tests/test_trainable.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/tests/test_trainable.py b/python/ray/tune/tests/test_trainable.py index 86d03b0e0d2ac..a64619c904d91 100644 --- a/python/ray/tune/tests/test_trainable.py +++ b/python/ray/tune/tests/test_trainable.py @@ -199,7 +199,7 @@ def _hanging_upload(*args, **kwargs): orig_upload_fn(*args, **kwargs) trainable = SavingTrainable( - "object", remote_checkpoint_dir="memory:///test/location", sync_timeout=0.1 + "object", remote_checkpoint_dir="memory:///test/location", sync_timeout=0.5 ) with patch("ray.air.checkpoint.upload_to_uri", _hanging_upload): From 873d3eba0e20bcc997d8ce5670fb994829990a75 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 2 Sep 2022 10:17:08 +0100 Subject: [PATCH 11/11] Use unique storage path Signed-off-by: Kai Fricke --- python/ray/tune/tests/test_trainable.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/tests/test_trainable.py b/python/ray/tune/tests/test_trainable.py index a64619c904d91..254edb5c2fdb7 100644 --- a/python/ray/tune/tests/test_trainable.py +++ b/python/ray/tune/tests/test_trainable.py @@ -199,7 +199,9 @@ def _hanging_upload(*args, **kwargs): orig_upload_fn(*args, **kwargs) trainable = SavingTrainable( - "object", remote_checkpoint_dir="memory:///test/location", sync_timeout=0.5 + "object", + remote_checkpoint_dir=f"memory:///test/location_hanging_{hanging}", + sync_timeout=0.5, ) with patch("ray.air.checkpoint.upload_to_uri", _hanging_upload): @@ -208,7 +210,9 @@ def _hanging_upload(*args, **kwargs): check_dir = tmpdir / "check_save_obj" try: - download_from_uri(uri="memory:///test/location", local_path=str(check_dir)) + download_from_uri( + uri=f"memory:///test/location_hanging_{hanging}", local_path=str(check_dir) + ) except FileNotFoundError: hung = True else: