-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathreinforcement_learning_flow.py
185 lines (163 loc) · 7.38 KB
/
reinforcement_learning_flow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from metaflow import FlowSpec, step,retry
import json
class ReinforcementLearningSimulatorFlow(FlowSpec):
'''
Train RL Agents With Different Input States and Reward Functions.
Simulate the Same Trained Agent in Simulations of
1. Left Environment with all left data.
2. Right Environment with all Right data
3. Environment
'''
@step
def start(self):
from models.CupboardAgents.DDPG.ddpg import DDPGArgs
# io_array will hold [[input][ouput]]
self.possible_states= [
[['joint_velocities','joint_positions','gripper_pose','task_low_dim_state'],['joint_velocities']],
[['joint_velocities','gripper_pose','task_low_dim_state'],['joint_velocities']],
[['joint_positions','gripper_pose','task_low_dim_state'],['joint_velocities']],
[['gripper_pose','task_low_dim_state'],['joint_velocities']]
]
self.reward_functions = [
'MahattanReward',
'ExponentialMahattanReward',
'EuclideanReward',
'ExponentialEuclideanReward',
'HuberReward'
]
self.simulation_environments = [
'ReachTarget',
'LeftTarget',
'RightTarget'
]
self.training_task = 'ReachTarget'
self.model_args = DDPGArgs()
self.episode_length=100 # Length of the Episode
self.warmup = 50 # Warmup Steps for RL Algo
self.num_episodes=1000 # Train Epochs.
self.num_sim_episodes = 500 # Test Epochs.
self.collect_gradients = False
self.next(self.reward_split_placeholder,foreach='possible_states')
@step
def reward_split_placeholder(self):
'''
Split the training for different Rewards functions
'''
self.input_state = self.input[0]
self.next(self.train,foreach='reward_functions')
@retry(times=4)
@step
def train(self):
'''
Train Reinforcement Learning agent according to an input_state and Reward Function
`todo` : Create a docker image for training
'''
self.input_value = self.input_state
self.reward_function = self.input
from SimulationEnvironment.ReachTargetRL import ReachTargetRLEnvironment
from models.ReachTargetRLAgent import ReachTargetRLAgent
agent = ReachTargetRLAgent(warmup=self.warmup,ddpg_args=self.model_args,input_states=self.input_state,collect_gradients=self.collect_gradients)
import importlib
reward_module = importlib.import_module('SimulationEnvironment.RewardFunctions')
reward_function = getattr(reward_module,self.reward_function)()
curr_env = ReachTargetRLEnvironment(
reward_function,
num_episodes=self.num_episodes,
episode_length=self.episode_length)
env_metrics = curr_env.train_rl_agent(agent)
self.training_metrics = env_metrics
# TODO : Add Model Saving Code here.
self.model = agent.get_model()
self.next(self.simulate_split)
@step
def simulate_split(self):
'''
This step will parallelise the training of the individual simulated models.
'''
self.next(self.simulate,foreach='simulation_environments')
@retry(times=4)
@step
def simulate(self):
'''
Simulate The Model To Different Versions of the Task such as Training on Left/Right/Everywhere.
'''
self.simulation_environment = self.input
from SimulationEnvironment.ReachTargetRL import ReachTargetRLEnvironment
from models.ReachTargetRLAgent import ReachTargetRLAgent
# todo : Add
agent = ReachTargetRLAgent(warmup=self.warmup,ddpg_args=self.model_args,input_states=self.input_state,collect_gradients=self.collect_gradients)
import importlib
reward_module = importlib.import_module('SimulationEnvironment.RewardFunctions')
reward_function = getattr(reward_module,self.reward_function)()
task_module = importlib.import_module('rlbench.tasks')
task = getattr(task_module,self.simulation_environment)
curr_env = ReachTargetRLEnvironment(
reward_function,
task=task,
num_episodes=self.num_sim_episodes,
episode_length=self.episode_length)
agent.load_model_from_object(self.model)
simulation_analytics = curr_env.train_rl_agent(agent,eval_mode=True)
self.simulation_analytics = simulation_analytics
self.next(self.join_simulations)
@step
def join_simulations(self,inputs):
from Utilities.core import SimulationAnalytics,RLMetrics
from Utilities.core import ModelAnalytics,RLMetrics,StoredModel
self.model_analytics = []
self.models = []
simulation_analytics = []
for task_data in inputs:
data = SimulationAnalytics()
data.analytics = task_data.simulation_analytics
data.simulation_environment = task_data.simulation_environment
simulation_analytics.append(data)
data = ModelAnalytics()
data.simulation_analytics = simulation_analytics
data.reward_function = inputs[0].reward_function
data.episode_length = inputs[0].episode_length
data.warmup = inputs[0].warmup
data.num_episodes = inputs[0].num_episodes
data.collect_gradients = inputs[0].collect_gradients
data.training_analytics.analytics = inputs[0].training_metrics
data.training_analytics.simulation_environment = inputs[0].training_task
data.input_states = inputs[0].input_value
self.model_analytics.append(data)
data = StoredModel()
data.episode_length = inputs[0].episode_length
data.warmup = inputs[0].warmup
data.num_episodes = inputs[0].num_episodes
data.reward_function = inputs[0].reward_function
data.model_args = inputs[0].model_args
data.collect_gradients = inputs[0].collect_gradients
data.training_analytics.analytics = inputs[0].training_metrics
data.training_analytics.simulation_environment = inputs[0].training_task
data.input_states = inputs[0].input_value
self.models.append(data)
self.next(self.join_reward)
@step
def join_reward(self,inputs):
'''
Collect all trained Models for all Reward branches of the training
'''
from Utilities.core import ModelAnalytics,RLMetrics,StoredModel
self.model_analytics = []
self.models = []
for task_data in inputs:
self.models+=task_data.models
self.model_analytics+=task_data.model_analytics
self.next(self.join_input_states)
@step
def join_input_states(self,inputs):
from Utilities.core import ModelAnalytics,RLMetrics,StoredModel
self.model_analytics = []
self.models = []
for task_data in inputs:
self.models+=task_data.models
self.model_analytics+=task_data.model_analytics
self.next(self.end)
@step
def end(self):
print("Done Computation")
if __name__ == '__main__':
ReinforcementLearningSimulatorFlow()