diff --git a/rllib/connectors/action/pipeline.py b/rllib/connectors/action/pipeline.py index dc10c60ce8bab..740b2b5f849bf 100644 --- a/rllib/connectors/action/pipeline.py +++ b/rllib/connectors/action/pipeline.py @@ -28,7 +28,6 @@ def __call__(self, ac_data: ActionConnectorDataType) -> ActionConnectorDataType: timer = self.timers[str(c)] with timer: ac_data = c(ac_data) - timer.push_units_processed(1) return ac_data def to_state(self): diff --git a/rllib/connectors/agent/pipeline.py b/rllib/connectors/agent/pipeline.py index a34afdd04613d..480161bc5a413 100644 --- a/rllib/connectors/agent/pipeline.py +++ b/rllib/connectors/agent/pipeline.py @@ -39,7 +39,6 @@ def __call__( timer = self.timers[str(c)] with timer: ret = c(ret) - timer.push_units_processed(1) return ret def to_state(self): diff --git a/rllib/evaluation/collectors/agent_collector.py b/rllib/evaluation/collectors/agent_collector.py index d8d02381e114c..afeda3b09d46f 100644 --- a/rllib/evaluation/collectors/agent_collector.py +++ b/rllib/evaluation/collectors/agent_collector.py @@ -1,14 +1,14 @@ +import copy import logging - -from copy import deepcopy -from gymnasium.spaces import Space import math +from typing import Any, Dict, List, Optional + import numpy as np import tree # pip install dm_tree -from typing import Any, Dict, List, Optional +from gymnasium.spaces import Space -from ray.rllib.policy.view_requirement import ViewRequirement from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.view_requirement import ViewRequirement from ray.rllib.utils.framework import try_import_tf, try_import_torch from ray.rllib.utils.spaces.space_utils import ( flatten_to_single_ndarray, @@ -20,7 +20,6 @@ TensorType, ViewRequirementsDict, ) - from ray.util.annotations import PublicAPI logger = logging.getLogger(__name__) @@ -38,6 +37,17 @@ def _to_float_np_array(v: List[Any]) -> np.ndarray: return arr +def _get_buffered_slice_with_paddings(d, inds): + element_at_t = [] + for index in inds: + if index < len(d): + element_at_t.append(d[index]) + else: + # zero pad similar to the last element. + element_at_t.append(tree.map_structure(np.zeros_like, d[-1])) + return element_at_t + + @PublicAPI class AgentCollector: """Collects samples for one agent in one trajectory (episode). @@ -230,8 +240,7 @@ def add_action_reward_next_obs(self, input_values: Dict[str, TensorType]) -> Non AgentCollector._next_unroll_id += 1 # Next obs -> obs. - # TODO @kourosh: remove the in-place operations and get rid of this deepcopy. - values = deepcopy(input_values) + values = copy.copy(input_values) assert SampleBatch.OBS not in values values[SampleBatch.OBS] = values[SampleBatch.NEXT_OBS] del values[SampleBatch.NEXT_OBS] @@ -356,8 +365,10 @@ def build_for_inference(self) -> SampleBatch: # before the last one (len(d) - 2) and so on. element_at_t = d[view_req.shift_arr + len(d) - 1] if element_at_t.shape[0] == 1: - # squeeze to remove the T dimension if it is 1. - element_at_t = element_at_t.squeeze(0) + # We'd normally squeeze here to remove the time dim, but we'll + # simply use the time dim as the batch dim. + data.append(element_at_t) + continue # add the batch dimension with [None] data.append(element_at_t[None]) @@ -458,20 +469,20 @@ def build_for_training( # handle the case where the inds are out of bounds from the end. # if during the indexing any of the indices are out of bounds, we # need to use padding on the end to fill in the missing indices. - element_at_t = [] - for index in inds: - if index < len(d): - element_at_t.append(d[index]) - else: - # zero pad similar to the last element. - element_at_t.append( - tree.map_structure(np.zeros_like, d[-1]) - ) - element_at_t = np.stack(element_at_t) + # Create padding first time we encounter data + if max(inds) < len(d): + # Simple case where we can simply pick slices from buffer + element_at_t = d[inds] + else: + # Case in which we have to pad because buffer has insufficient + # length. This branch takes more time than simply picking + # slices we try to avoid it. + element_at_t = _get_buffered_slice_with_paddings(d, inds) + element_at_t = np.stack(element_at_t) if element_at_t.shape[0] == 1: - # squeeze to remove the T dimension if it is 1. - element_at_t = element_at_t.squeeze(0) + # Remove the T dimension if it is 1. + element_at_t = element_at_t[0] shifted_data.append(element_at_t) # in some multi-agent cases shifted_data may be an empty list. diff --git a/rllib/evaluation/episode_v2.py b/rllib/evaluation/episode_v2.py index 1dea86071253c..f0574366c9f4c 100644 --- a/rllib/evaluation/episode_v2.py +++ b/rllib/evaluation/episode_v2.py @@ -1,5 +1,6 @@ import random from collections import defaultdict +import numpy as np from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple from ray.rllib.env.base_env import _DUMMY_AGENT_ID @@ -295,7 +296,7 @@ def postprocess_episode( if ( not pre_batch.is_single_trajectory() - or len(set(pre_batch[SampleBatch.EPS_ID])) > 1 + or len(np.unique(pre_batch[SampleBatch.EPS_ID])) > 1 ): raise ValueError( "Batches sent to postprocessing must only contain steps "