From dcddb2cb075592eaf71958f0dbb33217b92ba888 Mon Sep 17 00:00:00 2001 From: avnishn Date: Wed, 28 Jun 2023 16:59:16 -0700 Subject: [PATCH 1/4] [RLlib] Change placement group strategy for learner Signed-off-by: avnishn --- ...lti_node_checkpointing_compute_config.yaml | 6 ++-- ...node_checkpointing_compute_config_gce.yaml | 6 ++-- rllib/algorithms/algorithm.py | 8 ++--- rllib/algorithms/impala/impala.py | 31 +++++++++++++------ 4 files changed, 29 insertions(+), 22 deletions(-) diff --git a/release/rllib_tests/multi_node_checkpointing_compute_config.yaml b/release/rllib_tests/multi_node_checkpointing_compute_config.yaml index aabba297def2..aea63ff74dfa 100644 --- a/release/rllib_tests/multi_node_checkpointing_compute_config.yaml +++ b/release/rllib_tests/multi_node_checkpointing_compute_config.yaml @@ -9,9 +9,9 @@ head_node_type: worker_node_types: - name: worker_node - instance_type: g4dn.xlarge - min_workers: 2 - max_workers: 2 + instance_type: g3.8xlarge + min_workers: 1 + max_workers: 1 use_spot: false aws: diff --git a/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml b/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml index 1bcac0141dee..d955a2e77b15 100644 --- a/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml +++ b/release/rllib_tests/multi_node_checkpointing_compute_config_gce.yaml @@ -11,7 +11,7 @@ head_node_type: worker_node_types: - name: worker_node - instance_type: n1-standard-4-nvidia-tesla-t4-1 # g4dn.xlarge - min_workers: 2 - max_workers: 2 + instance_type: n1-standard-4-nvidia-t4-16gb-2 + min_workers: 1 + max_workers: 1 use_spot: false diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index f6131ee57a09..089497a378b5 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -2227,17 +2227,13 @@ def default_resource_request( # time as gpus if cf.num_gpus_per_learner_worker: learner_bundles = [ - { - "GPU": cf.num_gpus_per_learner_worker, - } - for _ in range(cf.num_learner_workers) + {"GPU": cf.num_learner_workers * cf.num_gpus_per_learner_worker} ] elif cf.num_cpus_per_learner_worker: learner_bundles = [ { - "CPU": cf.num_cpus_per_learner_worker, + "CPU": cf.num_cpus_per_learner_worker * cf.num_learner_workers, } - for _ in range(cf.num_learner_workers) ] bundles = [driver] + rollout_bundles + evaluation_bundles + learner_bundles diff --git a/rllib/algorithms/impala/impala.py b/rllib/algorithms/impala/impala.py index db4ed7e0d569..f3ba62a25aa0 100644 --- a/rllib/algorithms/impala/impala.py +++ b/rllib/algorithms/impala/impala.py @@ -842,29 +842,40 @@ def default_resource_request( if cf.num_learner_workers == 0: # if num_learner_workers is 0, then we need to allocate one gpu if # num_gpus_per_learner_worker is greater than 0. - trainer_bundle = [ + learner_bundles = [ { "CPU": cf.num_cpus_per_learner_worker, "GPU": cf.num_gpus_per_learner_worker, } ] else: + # if cf.num_gpus_per_learner_worker: + # trainer_bundle = [ + # { + # "GPU": cf.num_gpus_per_learner_worker, + # } + # for _ in range(cf.num_learner_workers) + # ] + # elif cf.num_cpus_per_learner_worker: + # trainer_bundle = [ + # { + # "CPU": cf.num_cpus_per_learner_worker, + # } + # for _ in range(cf.num_learner_workers) + # ] if cf.num_gpus_per_learner_worker: - trainer_bundle = [ - { - "GPU": cf.num_gpus_per_learner_worker, - } - for _ in range(cf.num_learner_workers) + learner_bundles = [ + {"GPU": cf.num_learner_workers * cf.num_gpus_per_learner_worker} ] elif cf.num_cpus_per_learner_worker: - trainer_bundle = [ + learner_bundles = [ { - "CPU": cf.num_cpus_per_learner_worker, + "CPU": cf.num_cpus_per_learner_worker + * cf.num_learner_workers, } - for _ in range(cf.num_learner_workers) ] - bundles += trainer_bundle + bundles += learner_bundles # Return PlacementGroupFactory containing all needed resources # (already properly defined as device bundles). From 96d10cbb07687fede7fbfe6e41be40b1f17a3e01 Mon Sep 17 00:00:00 2001 From: avnishn Date: Thu, 29 Jun 2023 15:48:04 -0700 Subject: [PATCH 2/4] Put all the bundle stuff in a separate function Signed-off-by: avnishn --- rllib/algorithms/algorithm.py | 97 +++++++++++++++++++------------ rllib/algorithms/impala/impala.py | 36 +----------- 2 files changed, 60 insertions(+), 73 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 089497a378b5..5874e8a64697 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -131,6 +131,42 @@ from ray.util.timer import _Timer from ray.tune.registry import get_trainable_cls + +try: + from ray.rllib.extensions import AlgorithmBase +except ImportError: + + class AlgorithmBase: + + @staticmethod + def _get_learner_bundles(cf: AlgorithmConfig): + """Selects the right resource bundles for learner workers based off of cf. + + Args: + cf: The algorithm config. + """ + if cf.num_learner_workers > 0: + if cf.num_gpus_per_learner_worker: + learner_bundles = [ + {"GPU": cf.num_learner_workers * cf.num_gpus_per_learner_worker} + ] + elif cf.num_cpus_per_learner_worker: + learner_bundles = [ + { + "CPU": cf.num_cpus_per_learner_worker * cf.num_learner_workers, + } + ] + else: + learner_bundles = { + # sampling and training is not done concurrently when local is + # used, so pick the max. + "CPU": max( + cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker + ), + "GPU": cf.num_gpus_per_learner_worker, + } + return learner_bundles + tf1, tf, tfv = try_import_tf() logger = logging.getLogger(__name__) @@ -146,7 +182,7 @@ def with_common_config(*args, **kwargs): @PublicAPI -class Algorithm(Trainable): +class Algorithm(Trainable, AlgorithmBase): """An RLlib algorithm responsible for optimizing one or more Policies. Algorithms contain a WorkerSet under `self.workers`. A WorkerSet is @@ -2157,30 +2193,26 @@ def default_resource_request( eval_cf.freeze() # resources for the driver of this trainable - if cf._enable_learner_api: - if cf.num_learner_workers == 0: - # in this case local_worker only does sampling and training is done on - # local learner worker - driver = { - # sampling and training is not done concurrently when local is - # used, so pick the max. - "CPU": max( - cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker - ), - "GPU": cf.num_gpus_per_learner_worker, - } - else: - # in this case local_worker only does sampling and training is done on - # remote learner workers - driver = {"CPU": cf.num_cpus_for_local_worker, "GPU": 0} - else: - # Without Learner API, the local_worker can do both sampling and training. - # So, we need to allocate the same resources for the driver as for the - # local_worker. - driver = { - "CPU": cf.num_cpus_for_local_worker, - "GPU": 0 if cf._fake_gpus else cf.num_gpus, - } + # if cf._enable_learner_api: + # if cf.num_learner_workers == 0: + # # in this case local_worker only does sampling and training is done on + # # local learner worker + # driver = { + # # sampling and training is not done concurrently when local is + # # used, so pick the max. + # "CPU": max( + # cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker + # ), + # "GPU": cf.num_gpus_per_learner_worker, + # } + # else: + # # in this case local_worker only does sampling and training is done on + # # remote learner workers + # driver = {"CPU": cf.num_cpus_for_local_worker, "GPU": 0} + driver = { + "CPU": cf.num_cpus_for_local_worker, + "GPU": 0 if cf._fake_gpus else cf.num_gpus, + } # resources for remote rollout env samplers rollout_bundles = [ @@ -2222,19 +2254,8 @@ def default_resource_request( # resources for remote learner workers learner_bundles = [] - if cf._enable_learner_api and cf.num_learner_workers > 0: - # can't specify cpus for learner workers at the same - # time as gpus - if cf.num_gpus_per_learner_worker: - learner_bundles = [ - {"GPU": cf.num_learner_workers * cf.num_gpus_per_learner_worker} - ] - elif cf.num_cpus_per_learner_worker: - learner_bundles = [ - { - "CPU": cf.num_cpus_per_learner_worker * cf.num_learner_workers, - } - ] + if cf._enable_learner_api: + learner_bundles = cls._get_learner_bundles(cf) bundles = [driver] + rollout_bundles + evaluation_bundles + learner_bundles diff --git a/rllib/algorithms/impala/impala.py b/rllib/algorithms/impala/impala.py index f3ba62a25aa0..c44c194bb4d8 100644 --- a/rllib/algorithms/impala/impala.py +++ b/rllib/algorithms/impala/impala.py @@ -839,41 +839,7 @@ def default_resource_request( # factories. if cf._enable_learner_api: # Resources for the Algorithm. - if cf.num_learner_workers == 0: - # if num_learner_workers is 0, then we need to allocate one gpu if - # num_gpus_per_learner_worker is greater than 0. - learner_bundles = [ - { - "CPU": cf.num_cpus_per_learner_worker, - "GPU": cf.num_gpus_per_learner_worker, - } - ] - else: - # if cf.num_gpus_per_learner_worker: - # trainer_bundle = [ - # { - # "GPU": cf.num_gpus_per_learner_worker, - # } - # for _ in range(cf.num_learner_workers) - # ] - # elif cf.num_cpus_per_learner_worker: - # trainer_bundle = [ - # { - # "CPU": cf.num_cpus_per_learner_worker, - # } - # for _ in range(cf.num_learner_workers) - # ] - if cf.num_gpus_per_learner_worker: - learner_bundles = [ - {"GPU": cf.num_learner_workers * cf.num_gpus_per_learner_worker} - ] - elif cf.num_cpus_per_learner_worker: - learner_bundles = [ - { - "CPU": cf.num_cpus_per_learner_worker - * cf.num_learner_workers, - } - ] + learner_bundles = cls._get_learner_bundles(cf) bundles += learner_bundles From 24905fee4c33f6150eb5df13b391ab2e6abcfa83 Mon Sep 17 00:00:00 2001 From: avnishn Date: Thu, 29 Jun 2023 17:04:49 -0700 Subject: [PATCH 3/4] Fix error Signed-off-by: avnishn --- rllib/algorithms/algorithm.py | 61 +++++++++++++++++------------------ 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 5874e8a64697..6b2bffc1d73f 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -137,11 +137,10 @@ except ImportError: class AlgorithmBase: - @staticmethod def _get_learner_bundles(cf: AlgorithmConfig): """Selects the right resource bundles for learner workers based off of cf. - + Args: cf: The algorithm config. """ @@ -153,20 +152,24 @@ def _get_learner_bundles(cf: AlgorithmConfig): elif cf.num_cpus_per_learner_worker: learner_bundles = [ { - "CPU": cf.num_cpus_per_learner_worker * cf.num_learner_workers, + "CPU": cf.num_cpus_per_learner_worker + * cf.num_learner_workers, } ] else: - learner_bundles = { - # sampling and training is not done concurrently when local is - # used, so pick the max. - "CPU": max( - cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker - ), - "GPU": cf.num_gpus_per_learner_worker, - } + learner_bundles = [ + { + # sampling and training is not done concurrently when local is + # used, so pick the max. + "CPU": max( + cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker + ), + "GPU": cf.num_gpus_per_learner_worker, + } + ] return learner_bundles + tf1, tf, tfv = try_import_tf() logger = logging.getLogger(__name__) @@ -2193,26 +2196,20 @@ def default_resource_request( eval_cf.freeze() # resources for the driver of this trainable - # if cf._enable_learner_api: - # if cf.num_learner_workers == 0: - # # in this case local_worker only does sampling and training is done on - # # local learner worker - # driver = { - # # sampling and training is not done concurrently when local is - # # used, so pick the max. - # "CPU": max( - # cf.num_cpus_per_learner_worker, cf.num_cpus_for_local_worker - # ), - # "GPU": cf.num_gpus_per_learner_worker, - # } - # else: - # # in this case local_worker only does sampling and training is done on - # # remote learner workers - # driver = {"CPU": cf.num_cpus_for_local_worker, "GPU": 0} - driver = { - "CPU": cf.num_cpus_for_local_worker, - "GPU": 0 if cf._fake_gpus else cf.num_gpus, - } + if cf._enable_learner_api: + if cf.num_learner_workers == 0: + # in this case local_worker only does sampling and training is done on + # local learner worker + driver = cls._get_learner_bundles(cf)[0] + else: + # in this case local_worker only does sampling and training is done on + # remote learner workers + driver = {"CPU": cf.num_cpus_for_local_worker, "GPU": 0} + else: + driver = { + "CPU": cf.num_cpus_for_local_worker, + "GPU": 0 if cf._fake_gpus else cf.num_gpus, + } # resources for remote rollout env samplers rollout_bundles = [ @@ -2254,7 +2251,7 @@ def default_resource_request( # resources for remote learner workers learner_bundles = [] - if cf._enable_learner_api: + if cf._enable_learner_api and cf.num_learner_workers > 0: learner_bundles = cls._get_learner_bundles(cf) bundles = [driver] + rollout_bundles + evaluation_bundles + learner_bundles From 890a58dd489fa360c7ed7410bc2e140d1f2707f5 Mon Sep 17 00:00:00 2001 From: avnishn Date: Thu, 29 Jun 2023 17:06:10 -0700 Subject: [PATCH 4/4] Address comments Signed-off-by: avnishn --- rllib/algorithms/algorithm.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rllib/algorithms/algorithm.py b/rllib/algorithms/algorithm.py index 6b2bffc1d73f..211eef34aaeb 100644 --- a/rllib/algorithms/algorithm.py +++ b/rllib/algorithms/algorithm.py @@ -138,11 +138,14 @@ class AlgorithmBase: @staticmethod - def _get_learner_bundles(cf: AlgorithmConfig): + def _get_learner_bundles(cf: AlgorithmConfig) -> List[Dict[str, int]]: """Selects the right resource bundles for learner workers based off of cf. Args: cf: The algorithm config. + + Returns: + A list of resource bundles for the learner workers. """ if cf.num_learner_workers > 0: if cf.num_gpus_per_learner_worker: