Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Turn on new execution path per default [no_early_kickoff] #34840

Merged
merged 17 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/tests/horovod/...
- bazel test --config=ci $(./ci/run/bazel_export_options) python/ray/tests/ray_lightning/...

### NEW EXECUTION PATH
### OLD EXECUTION PATH COMPAT


- label: ":octopus: :sunny: New execution path: Tune tests and examples (small)"
- label: ":octopus: :last_quarter_moon_with_face: Old execution path: Tune tests and examples (small)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
parallelism: 3
Expand All @@ -339,45 +339,45 @@
- ./ci/env/env_info.sh
- ./ci/run/run_bazel_test_with_sharding.sh
--config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_env=TUNE_NEW_EXECUTION=0
--test_tag_filters=-medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode,-exclude_new_execution
python/ray/tune/...

- label: ":octopus: :sunny: New execution path:Tune tests and examples (medium)"
- label: ":octopus: :last_quarter_moon_with_face: Old execution path: Tune tests and examples (medium)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 DATA_PROCESSING_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_env=TUNE_NEW_EXECUTION=0
--test_tag_filters=medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode,-exclude_new_execution
python/ray/tune/...

- label: ":octopus: :brain: :sunny: New execution path: Tune tests and examples {using RLlib}"
- label: ":octopus: :brain: :last_quarter_moon_with_face: Old execution path: Tune tests and examples {using RLlib}"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED", "RAY_CI_RLLIB_AFFECTED"]
instance_size: large
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_env=TUNE_NEW_EXECUTION=0
--test_tag_filters=-gpu_only,rllib,-exclude_new_execution python/ray/tune/...

- label: ":octopus: :sunny: New execution path: Tune tests and examples. Python 3.7"
- label: ":octopus: :last_quarter_moon_with_face: Old execution path: Tune tests and examples. Python 3.7"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 INSTALL_HOROVOD=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_env=TUNE_NEW_EXECUTION=1
--test_env=TUNE_NEW_EXECUTION=0
--test_tag_filters=py37,-client python/ray/tune/...

- label: ":octopus: :sunny: New execution path: ML library integrations tests and examples. Python 3.7"
- label: ":octopus: :last_quarter_moon_with_face: Old execution path: ML library integrations tests and examples. Python 3.7"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_TUNE_AFFECTED"]
instance_size: small
commands:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ py_test(

py_test(
name = "test_experiment_restore",
size = "medium",
size = "large",
srcs = [
"tests/test_experiment_restore.py",
"tests/_test_experiment_restore_run.py"
Expand Down
19 changes: 10 additions & 9 deletions python/ray/air/execution/_internal/actor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ def _try_start_actors(self, max_actors: Optional[int] = None) -> int:
# Start Ray actor
actor = remote_actor_cls.remote(**kwargs)

# Track
self._live_actors_to_ray_actors_resources[tracked_actor] = (
actor,
acquired_resources,
)
self._live_resource_cache = None

# Schedule ready future
future = actor.__ray_ready__.remote()

Expand Down Expand Up @@ -392,12 +399,6 @@ def on_error(exception: Exception):
on_error=on_error,
)

self._live_actors_to_ray_actors_resources[tracked_actor] = (
actor,
acquired_resources,
)
self._live_resource_cache = None

self._enqueue_cached_actor_tasks(tracked_actor=tracked_actor)

return started_actors
Expand Down Expand Up @@ -698,6 +699,9 @@ def schedule_actor_task(
args = args or tuple()
kwargs = kwargs or {}

if tracked_actor.actor_id in self._failed_actor_ids:
return

tracked_actor_task = TrackedActorTask(
tracked_actor=tracked_actor, on_result=on_result, on_error=on_error
)
Expand Down Expand Up @@ -874,6 +878,3 @@ def cleanup(self):
self._resource_manager.clear()

self.__init__(resource_manager=self._resource_manager)

def __del__(self):
self.cleanup()
22 changes: 14 additions & 8 deletions python/ray/air/tests/test_experiment_restore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import os

import numpy as np
import pandas as pd
from pathlib import Path
Expand Down Expand Up @@ -58,7 +60,6 @@ def test_experiment_restore(tmp_path, runner_type):
- The test will stop the script with a SIGINT at a random time between
4-8 iterations after each restore.


Requirements:
- Req 1: Reasonable runtime
- The experiment should finish within 2 * 16 = 32 seconds.
Expand Down Expand Up @@ -112,11 +113,14 @@ def test_experiment_restore(tmp_path, runner_type):
"NUM_TRIALS": str(num_trials),
"MAX_CONCURRENT_TRIALS": str(max_concurrent),
"CSV_DATA_FILE": csv_file,
"TUNE_NEW_EXECUTION": os.environ.get("TUNE_NEW_EXECUTION", "1"),
}

# Pass criteria
no_interrupts_runtime = 16.0
passing_factor = 2
# Todo(krfricke): See if we can improve the actor startup/shutdown time
# to reduce the passing factor again.
passing_factor = 2.5
passing_runtime = no_interrupts_runtime * passing_factor
_print_message(
"Experiment should finish with a total runtime of\n"
Expand Down Expand Up @@ -197,17 +201,19 @@ def test_experiment_restore(tmp_path, runner_type):
)
test_end_time = time.monotonic()

# Req 1: runtime
assert total_runtime <= passing_runtime, (
f"Expected runtime to be <= {passing_runtime}, but ran for: {total_runtime}. "
f"This means the experiment did not finish (iterations still running). Are "
f"there any performance regressions or expensive failure recoveries??"
)

# The script shouldn't have errored. (It should have finished by this point.)
assert return_code == 0, (
f"The script errored with return code: {return_code}.\n"
f"Check the `{_RUN_SCRIPT_FILENAME}` script for any issues."
f"Check the `{_RUN_SCRIPT_FILENAME}` script for any issues. "
)

# Req 1: runtime
assert (
total_runtime <= passing_runtime
), f"Expected runtime to be <= {passing_runtime}, but ran for: {total_runtime}"

# Req 2: training progress persisted
# Check that progress increases monotonically (we never go backwards/start from 0)
assert np.all(np.diff(progress_history) >= 0), (
Expand Down
27 changes: 24 additions & 3 deletions python/ray/tune/execution/tune_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ray.air._internal.checkpoint_manager import CheckpointStorage, _TrackedCheckpoint
from ray.air.execution import ResourceManager, PlacementGroupResourceManager
from ray.air.execution._internal import RayActorManager, TrackedActor
from ray.exceptions import RayActorError
from ray.tune.error import _AbortTrialExecution
from ray.tune.execution.ray_trial_executor import _class_cache
from ray.tune.execution.trial_runner import _TuneControllerBase, TrialRunnerWrapper
Expand Down Expand Up @@ -215,6 +216,11 @@ def _cleanup_stopping_actors(self, force_all: bool = False):
continue

_, tracked_actor = times.popleft()

if tracked_actor not in self._stopping_actors:
# Actor stopping has been handled by the block above
continue

if self._actor_manager.is_actor_started(tracked_actor=tracked_actor):
logger.debug(f"Forcefully killing actor: {tracked_actor}")
self._actor_manager.remove_actor(tracked_actor=tracked_actor, kill=True)
Expand Down Expand Up @@ -376,8 +382,6 @@ def _maybe_update_trial_queue(self):
def _cleanup_trials(self):
logger.debug("CLEANING UP all trials")

self._cleanup_cached_actors(force_all=True)

for tracked_actor in list(self._actor_to_trial):
trial = self._actor_to_trial[tracked_actor]
logger.debug(
Expand All @@ -386,6 +390,9 @@ def _cleanup_trials(self):
)
self._schedule_trial_stop(trial)

# Clean up cached actors now
self._cleanup_cached_actors(force_all=True)

start = time.monotonic()
while time.monotonic() - start < 5 and self._actor_manager.num_total_actors:
logger.debug("Waiting for actor manager to clean up final state")
Expand Down Expand Up @@ -518,6 +525,7 @@ def _maybe_reuse_cached_actor(self, trial: Trial) -> bool:
if trial in self._trial_to_actor:
original_actor = self._trial_to_actor.pop(trial)
self._actor_to_trial.pop(original_actor)

logger.debug(f"Removing ORIGINAL ACTOR for trial {trial}: {original_actor}")
self._remove_actor(tracked_actor=original_actor)

Expand Down Expand Up @@ -742,6 +750,14 @@ def _actor_failed(self, tracked_actor: TrackedActor, exception: Exception):
self._unstage_trial_with_resources(trial)
self._trial_task_failure(trial, exception=exception)

self._actor_manager.clear_actor_task_futures(tracked_actor)

# Clean up actor
tracked_actor.set_on_stop(None)
tracked_actor.set_on_error(None)
self._actor_manager.remove_actor(tracked_actor, kill=False)

# Trigger actor stopped callback
self._actor_stopped(tracked_actor)

def _schedule_trial_task(
Expand Down Expand Up @@ -794,7 +810,12 @@ def _on_result(tracked_actor: TrackedActor, *args, **kwargs):
if on_error:

def _on_error(tracked_actor: TrackedActor, exception: Exception):
assert trial == self._actor_to_trial[tracked_actor]
# If the actor failed, it has already been cleaned up.
if tracked_actor not in self._actor_to_trial:
assert isinstance(exception, RayActorError), type(exception)
else:
assert trial == self._actor_to_trial[tracked_actor]

logger.debug(
f"Future {method_name.upper()} FAILED for trial {trial}: "
f"{exception}"
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/tests/test_cluster_searcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def start_connected_cluster():


@pytest.mark.skipif(
os.environ.get("TUNE_NEW_EXECUTION") == "1",
os.environ.get("TUNE_NEW_EXECUTION") != "0",
reason=(
"This test uses the TrialRunner directly and needs to be rewritten "
"for the new execution backend."
Expand Down
14 changes: 7 additions & 7 deletions python/ray/tune/tests/test_progress_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def testEndToEndReporting(self):
output = run_string_as_driver(END_TO_END_COMMAND)
try:
# New execution path is too fast, trials are already terminated
if os.environ.get("TUNE_NEW_EXECUTION") != "1":
if os.environ.get("TUNE_NEW_EXECUTION") == "0":
assert EXPECTED_END_TO_END_START in output
assert EXPECTED_END_TO_END_END in output
assert "(raylet)" not in output, "Unexpected raylet log messages"
Expand All @@ -713,7 +713,7 @@ def testVerboseReporting(self):
self.assertIsNone(re.search(VERBOSE_TRIAL_NORM_2_PATTERN, output))
self.assertNotIn(VERBOSE_TRIAL_NORM_3, output)
self.assertNotIn(VERBOSE_TRIAL_NORM_4, output)
if os.environ.get("TUNE_NEW_EXECUTION") != "1":
if os.environ.get("TUNE_NEW_EXECUTION") == "0":
self.assertNotIn(VERBOSE_TRIAL_DETAIL, output)
except Exception:
print("*** BEGIN OUTPUT ***")
Expand All @@ -725,14 +725,14 @@ def testVerboseReporting(self):
output = run_string_as_driver(verbose_1_cmd)
try:
# New execution path is too fast, trials are already terminated
if os.environ.get("TUNE_NEW_EXECUTION") != "1":
if os.environ.get("TUNE_NEW_EXECUTION") == "0":
self.assertIn(VERBOSE_EXP_OUT_1, output)
self.assertIn(VERBOSE_EXP_OUT_2, output)
self.assertNotIn(VERBOSE_TRIAL_NORM_1, output)
self.assertIsNone(re.search(VERBOSE_TRIAL_NORM_2_PATTERN, output))
self.assertNotIn(VERBOSE_TRIAL_NORM_3, output)
self.assertNotIn(VERBOSE_TRIAL_NORM_4, output)
if os.environ.get("TUNE_NEW_EXECUTION") != "1":
if os.environ.get("TUNE_NEW_EXECUTION") == "0":
self.assertNotIn(VERBOSE_TRIAL_DETAIL, output)
except Exception:
print("*** BEGIN OUTPUT ***")
Expand All @@ -743,7 +743,7 @@ def testVerboseReporting(self):
verbose_2_cmd = VERBOSE_CMD + "verbose=2)"
output = run_string_as_driver(verbose_2_cmd)
try:
if os.environ.get("TUNE_NEW_EXECUTION") != "1":
if os.environ.get("TUNE_NEW_EXECUTION") == "0":
self.assertIn(VERBOSE_EXP_OUT_1, output)
self.assertIn(VERBOSE_EXP_OUT_2, output)
self.assertIn(VERBOSE_TRIAL_NORM_1, output)
Expand All @@ -760,14 +760,14 @@ def testVerboseReporting(self):
verbose_3_cmd = VERBOSE_CMD + "verbose=3)"
output = run_string_as_driver(verbose_3_cmd)
try:
if os.environ.get("TUNE_NEW_EXECUTION") != "1":
if os.environ.get("TUNE_NEW_EXECUTION") == "0":
self.assertIn(VERBOSE_EXP_OUT_1, output)
self.assertIn(VERBOSE_EXP_OUT_2, output)
self.assertNotIn(VERBOSE_TRIAL_NORM_1, output)
self.assertIsNone(re.search(VERBOSE_TRIAL_NORM_2_PATTERN, output))
self.assertNotIn(VERBOSE_TRIAL_NORM_3, output)
self.assertNotIn(VERBOSE_TRIAL_NORM_4, output)
if os.environ.get("TUNE_NEW_EXECUTION") != "1":
if os.environ.get("TUNE_NEW_EXECUTION") == "0":
self.assertIn(VERBOSE_TRIAL_DETAIL, output)
# Check that we don't print duplicate results at the end
self.assertTrue(output.count(VERBOSE_TRIAL_WITH_ONCE_RESULT) == 1)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/tests/test_trial_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1899,7 +1899,7 @@ def testFastPerturb(self):
shutil.rmtree(tmpdir)

@pytest.mark.skipif(
os.environ.get("TUNE_NEW_EXECUTION") == "1",
os.environ.get("TUNE_NEW_EXECUTION") != "0",
reason=(
"This test is generally flaky: The print after writing `Cleanup` "
"to the file is printed, but the data is not always written. "
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ class and registered trainables.
trial_checkpoint_config=experiments[0].checkpoint_config,
)

if bool(int(os.environ.get("TUNE_NEW_EXECUTION", "0"))):
if bool(int(os.environ.get("TUNE_NEW_EXECUTION", "1"))):
trial_runner_cls = TuneController
runner_kwargs.pop("trial_executor")
runner_kwargs["reuse_actors"] = reuse_actors
Expand Down