Skip to content

Commit

Permalink
[RLlib] AlgorithmConfig: Replace more occurrences of old config dicts…
Browse files Browse the repository at this point in the history
…; Make all Algorithms use the non-dict lookup for config properties. (#30096)
  • Loading branch information
sven1977 authored Nov 10, 2022
1 parent 093c8dc commit e715a8b
Show file tree
Hide file tree
Showing 41 changed files with 476 additions and 447 deletions.
6 changes: 3 additions & 3 deletions doc/source/rllib/rllib-algorithms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ in the main Algorithm config and inheriting the `RE3UpdateCallbacks` as shown in
config["exploration_config"] = {
"type": "RE3",
# the dimensionality of the observation embedding vectors in latent space.
"embeds_dim": 128,
"embeds_dim": 128,
"rho": 0.1, # Beta decay factor, used for on-policy algorithm.
"k_nn": 50, # Number of neighbours to set for K-NN entropy estimation.
# Configuration for the encoder network, producing embedding vectors from observations.
Expand All @@ -943,11 +943,11 @@ in the main Algorithm config and inheriting the `RE3UpdateCallbacks` as shown in
"fcnet_activation": "relu",
},
# Hyperparameter to choose between exploration and exploitation. A higher value of beta adds
# more importance to the intrinsic reward, as per the following equation
# more importance to the intrinsic reward, as per the following equation
# `reward = r + beta * intrinsic_reward`
"beta": 0.2,
# Schedule to use for beta decay, one of constant" or "linear_decay".
"beta_schedule": 'constant',
"beta_schedule": 'constant',
# Specify, which exploration sub-type to use (usually, the algo's "default"
# exploration, e.g. EpsilonGreedy for DQN, StochasticSampling for PG/SAC).
"sub_exploration": {
Expand Down
10 changes: 5 additions & 5 deletions rllib/algorithms/a2c/a2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def setup(self, config: PartialAlgorithmConfigDict):
# Create a microbatch variable for collecting gradients on microbatches'.
# These gradients will be accumulated on-the-fly and applied at once (once train
# batch size has been collected) to the model.
if self.config["microbatch_size"]:
if self.config.microbatch_size:
self._microbatches_grads = None
self._microbatches_counts = self._num_microbatches = 0

Expand All @@ -169,7 +169,7 @@ def training_step(self) -> ResultDict:
# W/o microbatching: Identical to Algorithm's default implementation.
# Only difference to a default Algorithm being the value function loss term
# and its value computations alongside each action.
if self.config["microbatch_size"] is None:
if self.config.microbatch_size is None:
return Algorithm.training_step(self)

# In microbatch mode, we want to compute gradients on experience
Expand All @@ -179,11 +179,11 @@ def training_step(self) -> ResultDict:
# used.
if self.config.count_steps_by == "agent_steps":
train_batch = synchronous_parallel_sample(
worker_set=self.workers, max_agent_steps=self.config["microbatch_size"]
worker_set=self.workers, max_agent_steps=self.config.microbatch_size
)
else:
train_batch = synchronous_parallel_sample(
worker_set=self.workers, max_env_steps=self.config["microbatch_size"]
worker_set=self.workers, max_env_steps=self.config.microbatch_size
)
self._counters[NUM_ENV_STEPS_SAMPLED] += train_batch.env_steps()
self._counters[NUM_AGENT_STEPS_SAMPLED] += train_batch.agent_steps()
Expand All @@ -204,7 +204,7 @@ def training_step(self) -> ResultDict:

# If `train_batch_size` reached: Accumulate gradients and apply.
num_microbatches = math.ceil(
self.config["train_batch_size"] / self.config["microbatch_size"]
self.config.train_batch_size / self.config.microbatch_size
)
if self._num_microbatches >= num_microbatches:
# Update counters.
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def setup(self, config: AlgorithmConfig) -> None:

# Set Algorithm's seed after we have - if necessary - enabled
# tf eager-execution.
update_global_seed_if_necessary(self.config["framework"], self.config["seed"])
update_global_seed_if_necessary(self.config.framework_str, self.config.seed)

self._record_usage(self.config)

Expand Down
22 changes: 9 additions & 13 deletions rllib/algorithms/alpha_star/alpha_star.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,18 +344,9 @@ def setup(self, config: AlphaStarConfig):
# one or more GPU nodes.
# - On each such node, also locate one replay buffer shard.

# By default, set max_num_policies_to_train to the number of policy IDs
# provided in the multiagent config.
if self.config["max_num_policies_to_train"] is None:
self.config["max_num_policies_to_train"] = len(
self.workers.local_worker().get_policies_to_train()
)

# Single CPU replay shard (co-located with GPUs so we can place the
# policies on the same machine(s)).
num_gpus = (
0.01 if (self.config["num_gpus"] and not self.config["_fake_gpus"]) else 0
)
num_gpus = 0.01 if (self.config.num_gpus and not self.config._fake_gpus) else 0
ReplayActor = ray.remote(
num_cpus=1,
num_gpus=num_gpus,
Expand All @@ -372,7 +363,12 @@ def setup(self, config: AlphaStarConfig):
# the initial first n learnable policies (found in the config).
distributed_learners = DistributedLearners(
config=self.config,
max_num_policies_to_train=self.config["max_num_policies_to_train"],
# By default, set max_num_policies_to_train to the number of policy IDs
# provided in the multiagent config.
max_num_policies_to_train=(
self.config.max_num_policies_to_train
or len(self.workers.local_worker().get_policies_to_train())
),
replay_actor_class=ReplayActor,
replay_actor_args=replay_actor_args,
)
Expand Down Expand Up @@ -406,15 +402,15 @@ def _set_policy_learners(worker):
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_sampler_worker"
],
ray_wait_timeout_s=self.config["timeout_s_sampler_manager"],
ray_wait_timeout_s=self.config.timeout_s_sampler_manager,
)
policy_actors = [policy_actor for _, policy_actor, _ in distributed_learners]
self._learner_worker_manager = AsyncRequestsManager(
workers=policy_actors,
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_learner_worker"
],
ray_wait_timeout_s=self.config["timeout_s_learner_manager"],
ray_wait_timeout_s=self.config.timeout_s_learner_manager,
)

@override(Algorithm)
Expand Down
2 changes: 1 addition & 1 deletion rllib/algorithms/alpha_star/distributed_learners.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
actor's constructor.
"""
self.config = config
self.num_gpus = self.config["num_gpus"]
self.num_gpus = self.config.num_gpus
self.max_num_policies_to_train = max_num_policies_to_train
self.replay_actor_class = replay_actor_class
self.replay_actor_args = replay_actor_args
Expand Down
4 changes: 2 additions & 2 deletions rllib/algorithms/alpha_zero/alpha_zero.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,9 @@ def training_step(self) -> ResultDict:
else NUM_ENV_STEPS_SAMPLED
]

if cur_ts > self.config["num_steps_sampled_before_learning_starts"]:
if cur_ts > self.config.num_steps_sampled_before_learning_starts:
train_batch = self.local_replay_buffer.sample(
self.config["train_batch_size"]
self.config.train_batch_size
)
else:
train_batch = None
Expand Down
31 changes: 14 additions & 17 deletions rllib/algorithms/apex_dqn/apex_dqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ def setup(self, config: PartialAlgorithmConfigDict):
-len(self.workers.remote_workers()) // 3 :
]

num_replay_buffer_shards = self.config["optimizer"]["num_replay_buffer_shards"]
num_replay_buffer_shards = self.config.optimizer["num_replay_buffer_shards"]

# Create copy here so that we can modify without breaking other logic
replay_actor_config = copy.deepcopy(self.config["replay_buffer_config"])
replay_actor_config = copy.deepcopy(self.config.replay_buffer_config)

replay_actor_config["capacity"] = (
self.config["replay_buffer_config"]["capacity"] // num_replay_buffer_shards
self.config.replay_buffer_config["capacity"] // num_replay_buffer_shards
)

ReplayActor = ray.remote(num_cpus=0)(replay_actor_config["type"])
Expand Down Expand Up @@ -386,14 +386,14 @@ def setup(self, config: PartialAlgorithmConfigDict):
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_replay_worker"
],
ray_wait_timeout_s=self.config["timeout_s_replay_manager"],
ray_wait_timeout_s=self.config.timeout_s_replay_manager,
)
self._sampling_actor_manager = AsyncRequestsManager(
self.workers.remote_workers(),
max_remote_requests_in_flight_per_worker=self.config[
"max_requests_in_flight_per_sampler_worker"
],
ray_wait_timeout_s=self.config["timeout_s_sampler_manager"],
ray_wait_timeout_s=self.config.timeout_s_sampler_manager,
)
self.learner_thread = LearnerThread(self.workers.local_worker())
self.learner_thread.start()
Expand Down Expand Up @@ -432,7 +432,7 @@ def training_step(self) -> ResultDict:
else NUM_ENV_STEPS_SAMPLED
]

if cur_ts > self.config["num_steps_sampled_before_learning_starts"]:
if cur_ts > self.config.num_steps_sampled_before_learning_starts:
# trigger a sample from the replay actors and enqueue operation to the
# learner thread.
self.sample_from_replay_buffer_place_on_learner_queue_non_blocking(
Expand Down Expand Up @@ -497,7 +497,7 @@ def update_workers(self, _num_samples_ready: Dict[ActorHandle, int]) -> int:
Returns:
The number of remote workers whose weights were updated.
"""
max_steps_weight_sync_delay = self.config["optimizer"]["max_weight_sync_delay"]
max_steps_weight_sync_delay = self.config.optimizer["max_weight_sync_delay"]
# Update our local copy of the weights if the learner thread has updated
# the learner worker's weights
policy_ids_updated = self.learner_thread.policy_ids_updated.copy()
Expand Down Expand Up @@ -566,17 +566,17 @@ def wait_on_replay_actors() -> List[Tuple[ActorHandle, SampleBatchType]]:
num_samples_collected = sum(num_worker_samples_collected.values())
self.curr_num_samples_collected += num_samples_collected
replay_sample_batches = wait_on_replay_actors()
if self.curr_num_samples_collected >= self.config["train_batch_size"]:
training_intensity = int(self.config["training_intensity"] or 1)
if self.curr_num_samples_collected >= self.config.train_batch_size:
training_intensity = int(self.config.training_intensity or 1)
num_requests_to_launch = (
self.curr_num_samples_collected / self.config["train_batch_size"]
self.curr_num_samples_collected / self.config.train_batch_size
) * training_intensity
num_requests_to_launch = max(1, round(num_requests_to_launch))
self.curr_num_samples_collected = 0
for _ in range(num_requests_to_launch):
self._replay_actor_manager.call(
lambda actor, num_items: actor.sample(num_items),
fn_args=[self.config["train_batch_size"]],
fn_args=[self.config.train_batch_size],
)
replay_sample_batches.extend(wait_on_replay_actors())

Expand All @@ -603,10 +603,7 @@ def update_replay_sample_priority(self) -> None:
env_steps,
agent_steps,
) = self.learner_thread.outqueue.get(timeout=0.001)
if (
self.config["replay_buffer_config"].get("prioritized_replay_alpha")
> 0
):
if self.config.replay_buffer_config.get("prioritized_replay_alpha") > 0:
replay_actor.update_priorities.remote(priority_dict)
num_samples_trained_this_itr += env_steps
self.update_target_networks(env_steps)
Expand All @@ -627,7 +624,7 @@ def update_target_networks(self, num_new_trained_samples) -> None:
self._num_ts_trained_since_last_target_update += num_new_trained_samples
if (
self._num_ts_trained_since_last_target_update
>= self.config["target_network_update_freq"]
>= self.config.target_network_update_freq
):
self._num_ts_trained_since_last_target_update = 0
with self._timers[TARGET_NET_UPDATE_TIMER]:
Expand Down Expand Up @@ -662,7 +659,7 @@ def on_worker_failures(
def _compile_iteration_results(self, *args, **kwargs):
result = super()._compile_iteration_results(*args, **kwargs)
replay_stats = ray.get(
self._replay_actors[0].stats.remote(self.config["optimizer"].get("debug"))
self._replay_actors[0].stats.remote(self.config.optimizer.get("debug"))
)
exploration_infos_list = self.workers.foreach_policy_to_train(
lambda p, pid: {pid: p.get_exploration_state()}
Expand Down
6 changes: 3 additions & 3 deletions rllib/algorithms/appo/appo.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def __call__(self, fetches):
lambda p, _: p.update_target()
)
# Also update KL Coeff
if self.config["use_kl_loss"]:
if self.config.use_kl_loss:
self.update_kl(fetches)


Expand Down Expand Up @@ -231,7 +231,7 @@ def after_train_step(self, train_results: ResultDict) -> None:
]
last_update = self._counters[LAST_TARGET_UPDATE_TS]
target_update_freq = (
self.config["num_sgd_iter"] * self.config["minibatch_buffer_size"]
self.config.num_sgd_iter * self.config.minibatch_buffer_size
)
if cur_ts - last_update > target_update_freq:
self._counters[NUM_TARGET_UPDATES] += 1
Expand All @@ -243,7 +243,7 @@ def after_train_step(self, train_results: ResultDict) -> None:
)

# Also update the KL-coefficient for the APPO loss, if necessary.
if self.config["use_kl_loss"]:
if self.config.use_kl_loss:

def update(pi, pi_id):
assert LEARNER_STATS_KEY not in train_results, (
Expand Down
Loading

0 comments on commit e715a8b

Please sign in to comment.