-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathobservations.py
175 lines (143 loc) · 6.2 KB
/
observations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Observations implementations for the Simulink Gym wrapper."""
from typing import Any, Callable, List, Union
import numpy as np
from gymnasium.spaces import Box
from . import logger
class Observation:
"""Class for representation of environment observations."""
def __init__(
self,
name: str,
low: float,
high: float,
parameter: str,
value_setter: Callable[[str, Union[int, float]], Any],
initial_value: Union[int, float] = None,
):
"""
Class representing environment observations.
Block parameter values can be either defined directly in the block, if the value
path is available programmatically (check MATLAB/Simulink documentation for that
matter), or indirectly by using a workspace variable. For this, define a
variable in the model workspace and set the block parameter value to this
variable. The value can then be set programmatically by setting the workspace
variable. Depending on this, set the value_setter method accordingly.
Parameters:
name: name of the observation
low: lower boundary of the observation value, can also be -numpy.inf, used
to define the observation space
high: higher boundary of the observation value, can also be numpy.inf, used
to define the observation space
parameter: path of the block parameter for setting the initial value (if
setting the block value directly) or name of the workspace variable used
for the block parameter
value_setter: method for setting the initial value, either
SimulinkEnv.set_block_parameter or SimulinkEnv.set_workspace_variable
initial_value: initial value of the observation (see BlockParam.value), the
value will be sampled from the observation space if None, default: None
"""
self.name = name
self.space = Box(low=low, high=high, shape=(1,), dtype=np.float32)
self.initial_value = initial_value if initial_value else self.space.sample()[0]
self.parameter = parameter
self._value_setter = value_setter
def _check_initial_value(self, value):
"""
Check initial value of observation.
Args:
value: initial value to be checked
Raises:
ValueError: if the initial value is not inside the observation space limits
"""
value = np.array(value, ndmin=1, dtype=np.float32)
if not self.space.contains(value):
raise ValueError(
f"Observation {self.name}: "
f"Initial value {value} not inside space limits "
f"([{self.space.low}, {self.space.high}]). "
f"{self.space.shape}, {value.shape}"
)
@property
def initial_value(self):
"""
Getter function for initial value of the observation.
Returns: initial value of the observation
"""
return self._initial_value
@initial_value.setter
def initial_value(self, value):
"""
Setter method for the initial value.
Args:
value: initial value to be set
"""
logger.debug(f"Setting {self.name} to {value}")
self._check_initial_value(value)
self._initial_value = value
def resample_initial_value(self):
"""Resample the initial value according to observation space."""
self._initial_value = self.space.sample()[0]
def reset_value(self):
"""Set the initial value in the simulation object."""
self._value_setter(self.parameter, self.initial_value)
class Observations:
"""Class representing multiple environment observations as a list."""
def __init__(self, observations: List[Observation]):
"""
Class representing multiple environment observations as a list.
Args:
observations: list of observations
"""
self._observations = observations
# Create combined observation space from single observations:
lows = np.array(
[observation.space.low[0] for observation in self._observations],
dtype=np.float32,
)
highs = np.array(
[observation.space.high[0] for observation in self._observations],
dtype=np.float32,
)
self.space = Box(low=lows, high=highs)
def __getitem__(self, index: int):
"""Method for indexing of observations list."""
return self._observations[index]
def __iter__(self):
"""Method for iterating over observations list."""
return self._observations.__iter__()
def __next__(self):
"""Method for getting next observation in list."""
return self._observations.__next__()
def __len__(self):
"""Method for determining number of observations."""
return len(self._observations)
def resample_all_initial_values(self):
"""Resampling all observations."""
self.initial_state = self.space.sample()
@property
def initial_state(self):
"""Combined initial state of all observations as numpy array."""
initial_state = [obs.initial_value for obs in self._observations]
return np.array(initial_state, ndmin=1, dtype=np.float32)
@initial_state.setter
def initial_state(self, values: np.ndarray):
"""
Set method for the initial state.
Args:
values: numpy array containing initial values for all observations
Raises:
ValueError: if the shape of the input values does not match the shape of the
observation space
"""
if values.shape == self.space.shape:
for index, observation in enumerate(self._observations):
observation.initial_value = values[index]
else:
raise ValueError(
f"Shape of values ({values.shape}) not equal to "
f"shape of observations ({self.space.shape})"
)
def reset_values(self):
"""Reset all observation values to their initial values."""
for obs in self._observations:
obs.reset_value()