From 9d1c878728cdc442b2eb3605669f754fa64924cb Mon Sep 17 00:00:00 2001 From: Avnish Narayan <38871737+avnishn@users.noreply.github.com> Date: Fri, 30 Jun 2023 13:34:44 -0700 Subject: [PATCH] [RLlib] Change placement group strategy for learner (#36929) Signed-off-by: avnishn Signed-off-by: e428265 --- ...lti_node_checkpointing_compute_config.yaml | 6 +- ...node_checkpointing_compute_config_gce.yaml | 6 +- rllib/algorithms/algorithm.py | 73 ++++++++++++------- rllib/algorithms/impala/impala.py | 29 +------- 4 files changed, 54 insertions(+), 60 deletions(-) diff --git a/release/rllib_tests/multi_node_checkpointing_compute_config.yaml b/release/rllib_tests/multi_node_checkpointing_compute_config.yaml index aabba297def25..aea63ff74dfa2 100644 --- a/release/rllib_tests/multi_node_checkpointing_compute_config.yaml +++ b/release/rllib_tests/multi_node_checkpointing_compute_config.yaml @@ -9,9 +9,9 @@ head_node_type: worker_node_types: - name: worker_node - instance_type: g4dn.xlarge - min_workers: 2 - max_workers: 2 + instance_type: g3.8xlarge + min_workers: 1 + max_workers: 1 use_spot: false aws: diff --git a/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml b/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml index 1bcac0141dee1..d955a2e77b158 100644 --- a/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml +++ b/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml @@ -11,7 +11,7 @@ head_node_type: worker_node_types: - name: worker_node - instance_type: n1-standard-4-nvidia-tesla-t4-1 # g4dn.xlarge - min_workers: 2 - max_workers: 2 + instance_type: n1-standard-4-nvidia-t4-16gb-2 + min_workers: 1 + max_workers: 1 use_spot: false diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index f6131ee57a09b..211eef34aaeb7 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -131,6 +131,48 @@ from ray.util.timer import _Timer from ray.tune.registry import get_trainable_cls + +try: + from ray.rllib.extensions import AlgorithmBase +except ImportError: + + class AlgorithmBase: + @staticmethod + def _get_learner_bundles(cf: AlgorithmConfig) -> List[Dict[str, int]]: + """Selects the right resource bundles for learner workers based off of cf. + + Args: + cf: The algorithm config. + + Returns: + A list of resource bundles for the learner workers. + """ + if cf.num_learner_workers > 0: + if cf.num_gpus_per_learner_worker: + learner_bundles = [ + {"GPU": cf.num_learner_workers * cf.num_gpus_per_learner_worker} + ] + elif cf.num_cpus_per_learner_worker: + learner_bundles = [ + { + "CPU": cf.num_cpus_per_learner_worker + * cf.num_learner_workers, + } + ] + else: + learner_bundles = [ + { + # sampling and training is not done concurrently when local is + # used, so pick the max. + "CPU": max( + cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker + ), + "GPU": cf.num_gpus_per_learner_worker, + } + ] + return learner_bundles + + tf1, tf, tfv = try_import_tf() logger = logging.getLogger(__name__) @@ -146,7 +188,7 @@ def with_common_config(*args, **kwargs): @PublicAPI -class Algorithm(Trainable): +class Algorithm(Trainable, AlgorithmBase): """An RLlib algorithm responsible for optimizing one or more Policies. Algorithms contain a WorkerSet under `self.workers`. A WorkerSet is @@ -2161,22 +2203,12 @@ def default_resource_request( if cf.num_learner_workers == 0: # in this case local_worker only does sampling and training is done on # local learner worker - driver = { - # sampling and training is not done concurrently when local is - # used, so pick the max. - "CPU": max( - cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker - ), - "GPU": cf.num_gpus_per_learner_worker, - } + driver = cls._get_learner_bundles(cf)[0] else: # in this case local_worker only does sampling and training is done on # remote learner workers driver = {"CPU": cf.num_cpus_for_local_worker, "GPU": 0} else: - # Without Learner API, the local_worker can do both sampling and training. - # So, we need to allocate the same resources for the driver as for the - # local_worker. driver = { "CPU": cf.num_cpus_for_local_worker, "GPU": 0 if cf._fake_gpus else cf.num_gpus, @@ -2223,22 +2255,7 @@ def default_resource_request( # resources for remote learner workers learner_bundles = [] if cf._enable_learner_api and cf.num_learner_workers > 0: - # can't specify cpus for learner workers at the same - # time as gpus - if cf.num_gpus_per_learner_worker: - learner_bundles = [ - { - "GPU": cf.num_gpus_per_learner_worker, - } - for _ in range(cf.num_learner_workers) - ] - elif cf.num_cpus_per_learner_worker: - learner_bundles = [ - { - "CPU": cf.num_cpus_per_learner_worker, - } - for _ in range(cf.num_learner_workers) - ] + learner_bundles = cls._get_learner_bundles(cf) bundles = [driver] + rollout_bundles + evaluation_bundles + learner_bundles diff --git a/rllib/algorithms/impala/impala.py b/rllib/algorithms/impala/impala.py index db4ed7e0d5690..c44c194bb4d86 100644 --- a/rllib/algorithms/impala/impala.py +++ b/rllib/algorithms/impala/impala.py @@ -839,32 +839,9 @@ def default_resource_request( # factories. if cf._enable_learner_api: # Resources for the Algorithm. - if cf.num_learner_workers == 0: - # if num_learner_workers is 0, then we need to allocate one gpu if - # num_gpus_per_learner_worker is greater than 0. - trainer_bundle = [ - { - "CPU": cf.num_cpus_per_learner_worker, - "GPU": cf.num_gpus_per_learner_worker, - } - ] - else: - if cf.num_gpus_per_learner_worker: - trainer_bundle = [ - { - "GPU": cf.num_gpus_per_learner_worker, - } - for _ in range(cf.num_learner_workers) - ] - elif cf.num_cpus_per_learner_worker: - trainer_bundle = [ - { - "CPU": cf.num_cpus_per_learner_worker, - } - for _ in range(cf.num_learner_workers) - ] - - bundles += trainer_bundle + learner_bundles = cls._get_learner_bundles(cf) + + bundles += learner_bundles # Return PlacementGroupFactory containing all needed resources # (already properly defined as device bundles).