From 31c34ac990b12ddd5c4953008fcf3d60d8e9efdf Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 9 Dec 2022 16:00:29 -0800 Subject: [PATCH 1/2] [tune/internal] Move signal handling into separate method Signed-off-by: Kai Fricke --- python/ray/tune/tune.py | 136 +++++++++++++++++++++------------------- 1 file changed, 72 insertions(+), 64 deletions(-) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index d400411df6a46..bf0944c7fa9c0 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -127,6 +127,39 @@ def _report_progress( reporter.report(trials, done, sched_debug_str, executor_debug_str) +def _setup_signal_catching() -> threading.Event: + original_handler = signal.getsignal(signal.SIGINT) + stop_event = threading.Event() + + def signal_interrupt_tune_run(sig: int, frame): + logger.warning( + "Stop signal received (e.g. via SIGINT/Ctrl+C), ending Ray Tune run. " + "This will try to checkpoint the experiment state one last time. " + "Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) " + "to skip. " + ) + stop_event.set() + # Restore original signal handler to react to future SIGINT signals + signal.signal(signal.SIGINT, original_handler) + + # We should only install the handler when it is safe to do so. + # When tune.run() is called from worker thread, signal.signal will + # fail. + allow_signal_catching = True + if threading.current_thread() != threading.main_thread(): + allow_signal_catching = False + + if allow_signal_catching: + if not int(os.getenv("TUNE_DISABLE_SIGINT_HANDLER", "0")): + signal.signal(signal.SIGINT, signal_interrupt_tune_run) + + # Always register SIGUSR1 if available (not available e.g. on Windows) + if hasattr(signal, "SIGUSR1"): + signal.signal(signal.SIGUSR1, signal_interrupt_tune_run) + + return stop_event + + @PublicAPI def run( run_or_experiment: Union[str, Callable, Type], @@ -507,11 +540,6 @@ class and registered trainables. "well as implementing `reset_config` for Trainable." ) - trial_executor = trial_executor or RayTrialExecutor( - reuse_actors=reuse_actors, - result_buffer_length=result_buffer_length, - chdir_to_trial_dir=chdir_to_trial_dir, - ) if isinstance(run_or_experiment, list): experiments = run_or_experiment else: @@ -627,6 +655,36 @@ class and registered trainables. callbacks, sync_config, metric=metric, progress_metrics=progress_metrics ) + # User Warning for GPUs + if ray.cluster_resources().get("GPU", 0): + if _check_gpus_in_resources(resources=resources_per_trial): + # "gpu" is manually set. + pass + elif _check_default_resources_override(experiments[0].run_identifier): + # "default_resources" is manually overridden. + pass + else: + logger.warning( + "Tune detects GPUs, but no trials are using GPUs. " + "To enable trials to use GPUs, wrap `train_func` with " + "`tune.with_resources(train_func, resources_per_trial={'gpu': 1})` " + "which allows Tune to expose 1 GPU to each trial. " + "For Ray AIR Trainers, you can specify GPU resources " + "through `ScalingConfig(use_gpu=True)`. " + "You can also override " + "`Trainable.default_resource_request` if using the " + "Trainable API." + ) + + stop_event = _setup_signal_catching() + + progress_reporter = progress_reporter or _detect_reporter() + + trial_executor = trial_executor or RayTrialExecutor( + reuse_actors=reuse_actors, + result_buffer_length=result_buffer_length, + chdir_to_trial_dir=chdir_to_trial_dir, + ) runner = TrialRunner( search_alg=search_alg, scheduler=scheduler, @@ -662,58 +720,6 @@ class and registered trainables. experiments=experiments, total_num_samples=search_alg.total_samples ) - # User Warning for GPUs - if trial_executor.has_gpus(): - if _check_gpus_in_resources(resources=resources_per_trial): - # "gpu" is manually set. - pass - elif _check_default_resources_override(experiments[0].run_identifier): - # "default_resources" is manually overridden. - pass - else: - logger.warning( - "Tune detects GPUs, but no trials are using GPUs. " - "To enable trials to use GPUs, wrap `train_func` with " - "`tune.with_resources(train_func, resources_per_trial={'gpu': 1})` " - "which allows Tune to expose 1 GPU to each trial. " - "For Ray AIR Trainers, you can specify GPU resources " - "through `ScalingConfig(use_gpu=True)`. " - "You can also override " - "`Trainable.default_resource_request` if using the " - "Trainable API." - ) - - original_handler = signal.getsignal(signal.SIGINT) - state = {"signal": None} - - def signal_interrupt_tune_run(sig: int, frame): - logger.warning( - "Stop signal received (e.g. via SIGINT/Ctrl+C), ending Ray Tune run. " - "This will try to checkpoint the experiment state one last time. " - "Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) " - "to skip. " - ) - state["signal"] = sig - # Restore original signal handler to react to future SIGINT signals - signal.signal(signal.SIGINT, original_handler) - - # We should only install the handler when it is safe to do so. - # When tune.run() is called from worker thread, signal.signal will - # fail. - allow_signal_catching = True - if threading.current_thread() != threading.main_thread(): - allow_signal_catching = False - - if allow_signal_catching: - if not int(os.getenv("TUNE_DISABLE_SIGINT_HANDLER", "0")): - signal.signal(signal.SIGINT, signal_interrupt_tune_run) - - # Always register SIGUSR1 if available (not available e.g. on Windows) - if hasattr(signal, "SIGUSR1"): - signal.signal(signal.SIGUSR1, signal_interrupt_tune_run) - - progress_reporter = progress_reporter or _detect_reporter() - tune_start = time.time() progress_reporter.setup( @@ -722,7 +728,7 @@ def signal_interrupt_tune_run(sig: int, frame): metric=metric, mode=mode, ) - while not runner.is_finished() and not state["signal"]: + while not runner.is_finished() and not stop_event.is_set(): runner.step() if has_verbosity(Verbosity.V1_EXPERIMENT): _report_progress(runner, progress_reporter) @@ -736,6 +742,9 @@ def signal_interrupt_tune_run(sig: int, frame): if has_verbosity(Verbosity.V1_EXPERIMENT): _report_progress(runner, progress_reporter, done=True) + all_trials = runner.get_trials() + experiment_checkpoint = runner.checkpoint_file + # Wait for syncing to finish for callback in callbacks: if isinstance(callback, SyncerCallback): @@ -747,12 +756,12 @@ def signal_interrupt_tune_run(sig: int, frame): runner.cleanup() incomplete_trials = [] - for trial in runner.get_trials(): + for trial in all_trials: if trial.status != Trial.TERMINATED: incomplete_trials += [trial] if incomplete_trials: - if raise_on_failed_trial and not state["signal"]: + if raise_on_failed_trial and not stop_event.is_set(): raise TuneError("Trials did not complete", incomplete_trials) else: logger.error("Trials did not complete: %s", incomplete_trials) @@ -764,17 +773,16 @@ def signal_interrupt_tune_run(sig: int, frame): f"({tune_taken:.2f} seconds for the tuning loop)." ) - if state["signal"]: + if stop_event.is_set(): logger.warning( "Experiment has been interrupted, but the most recent state was " "saved. You can continue running this experiment by passing " "`resume=True` to `tune.run()`" ) - trials = runner.get_trials() return ExperimentAnalysis( - runner.checkpoint_file, - trials=trials, + experiment_checkpoint, + trials=all_trials, default_metric=metric, default_mode=mode, sync_config=sync_config, From 2d4f359820552a9711f1532a44863278fe38ddd9 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Fri, 9 Dec 2022 17:29:08 -0800 Subject: [PATCH 2/2] Rename event Signed-off-by: Kai Fricke --- python/ray/tune/tune.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index bf0944c7fa9c0..feb7584115090 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -129,7 +129,7 @@ def _report_progress( def _setup_signal_catching() -> threading.Event: original_handler = signal.getsignal(signal.SIGINT) - stop_event = threading.Event() + experiment_interrupted_event = threading.Event() def signal_interrupt_tune_run(sig: int, frame): logger.warning( @@ -138,7 +138,7 @@ def signal_interrupt_tune_run(sig: int, frame): "Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) " "to skip. " ) - stop_event.set() + experiment_interrupted_event.set() # Restore original signal handler to react to future SIGINT signals signal.signal(signal.SIGINT, original_handler) @@ -157,7 +157,7 @@ def signal_interrupt_tune_run(sig: int, frame): if hasattr(signal, "SIGUSR1"): signal.signal(signal.SIGUSR1, signal_interrupt_tune_run) - return stop_event + return experiment_interrupted_event @PublicAPI @@ -676,7 +676,7 @@ class and registered trainables. "Trainable API." ) - stop_event = _setup_signal_catching() + experiment_interrupted_event = _setup_signal_catching() progress_reporter = progress_reporter or _detect_reporter() @@ -728,7 +728,7 @@ class and registered trainables. metric=metric, mode=mode, ) - while not runner.is_finished() and not stop_event.is_set(): + while not runner.is_finished() and not experiment_interrupted_event.is_set(): runner.step() if has_verbosity(Verbosity.V1_EXPERIMENT): _report_progress(runner, progress_reporter) @@ -761,7 +761,7 @@ class and registered trainables. incomplete_trials += [trial] if incomplete_trials: - if raise_on_failed_trial and not stop_event.is_set(): + if raise_on_failed_trial and not experiment_interrupted_event.is_set(): raise TuneError("Trials did not complete", incomplete_trials) else: logger.error("Trials did not complete: %s", incomplete_trials) @@ -773,7 +773,7 @@ class and registered trainables. f"({tune_taken:.2f} seconds for the tuning loop)." ) - if stop_event.is_set(): + if experiment_interrupted_event.is_set(): logger.warning( "Experiment has been interrupted, but the most recent state was " "saved. You can continue running this experiment by passing "