-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAI.py
147 lines (125 loc) · 5.56 KB
/
AI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import io
import random
import numpy as np
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.distributions.categorical import Categorical
def set_random_seed(seed = 73):
T.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
def state_to_tensor(state):
state = np.array(state, dtype=np.float64)
state = T.tensor(state, dtype=T.float)
return state
def squeeze_vars(probability_distribution, action, critic_value):
probabilities = T.squeeze(probability_distribution.log_prob(action)).item()
action = T.squeeze(action).item()
critic_value = T.squeeze(critic_value).item()
return [probabilities, action, critic_value]
class AgentMemory():
def __init__(self, batch_size):
# Each item in the lists is one piece of memory
self.memory = {
"states": [], # States
"probabilities": [], # log probabilities
"critic_outputs": [], # values the critic calculates
"actions": [],
"rewards": [], # Rewards received
"dones": [] # Terminal flags?
}
self.batch_size = batch_size
def generate_batches(self):
n_states = self.get_memory_size() # Current memory size
batch_start = np.arange(0, n_states, self.batch_size)
# Each index is 1 memory
indices = np.arange(n_states, dtype=np.int64)
np.random.shuffle(indices)
# Create the start and end indices for each batch
batches = [indices[i:i+self.batch_size] for i in batch_start]
return [self.memory, batches]
def store_memory(self, state, action, critic_value, probs, reward, done):
self.memory["states"].append(state)
self.memory["actions"].append(action)
self.memory["probabilities"].append(probs)
self.memory["critic_outputs"].append(critic_value)
self.memory["rewards"].append(reward)
self.memory["dones"].append(done)
# Clear memory at the end of a trajectory
def clear_memory(self):
for key in self.memory:
self.memory[key] = []
def get_memory_size(self) -> int:
return len(self.memory["states"])
class ActorNeuralNetwork(nn.Module):
def __init__(self, n_actions, state_dims, lr, fc1_dims=256, fc2_dims=256):
super(ActorNeuralNetwork, self).__init__()
self.checkpoint_folder = "ai_checkpoints"
self.neuralNetwork = nn.Sequential(
nn.Linear(state_dims, fc1_dims),
nn.ReLU(),
nn.Linear(fc1_dims, fc2_dims),
nn.ReLU(),
nn.Linear(fc2_dims, n_actions),
nn.Softmax(dim=-1)
)
self.optimizer = optim.Adam(self.parameters(), lr=lr)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, state):
# Set model to evaluation mode to make predictions
self.neuralNetwork.eval()
state = state.to(self.device) # Send state to the GPU
dist = self.neuralNetwork(state)
dist = Categorical(dist) # Define categorical distribution
return dist
def save_checkpoint(self, checkpoint_file):
path = os.path.join(self.checkpoint_folder, checkpoint_file)
T.save(self.state_dict(), path)
def load_checkpoint(self, checkpoint_file):
path = os.path.join(self.checkpoint_folder, checkpoint_file)
self.load_state_dict(T.load(path))
# The same as save_checkpoint, but retuns the data in-memory instead of to a file.
def save_params_mem(self):
memory_buffer = io.BytesIO()
T.save(self.state_dict(), memory_buffer)
# The same as load_checkpoint, but retuns the data in-memory instead of to a file.
def load_params_mem(self, memory_buffer):
memory_buffer.seek(0) # Move the read/write head of the in-memory file to the start (0)
self.load_state_dict(T.load(memory_buffer))
class CriticNeuralNetwork(nn.Module):
def __init__(self, state_dims, lr, fc1_dims=256, fc2_dims=256):
super(CriticNeuralNetwork, self).__init__()
self.checkpoint_folder = "ai_checkpoints"
self.neuralNetwork = nn.Sequential(
nn.Linear(state_dims, fc1_dims),
nn.ReLU(),
nn.Linear(fc1_dims, fc2_dims),
nn.ReLU(),
nn.Linear(fc2_dims, 1)
)
self.optimizer = optim.Adam(self.parameters(), lr=lr)
self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
self.to(self.device)
def forward(self, state):
self.neuralNetwork.eval()
state = state.to(self.device) # Send state to the GPU
value = self.neuralNetwork(state)
return value
def save_checkpoint(self, checkpoint_file):
path = os.path.join(self.checkpoint_folder, checkpoint_file)
T.save(self.state_dict(), path)
def load_checkpoint(self, checkpoint_file):
path = os.path.join(self.checkpoint_folder, checkpoint_file)
self.load_state_dict(T.load(path))
# The same as save_checkpoint, but retuns the data in-memory instead of to a file.
def save_params_mem(self):
memory_buffer = io.BytesIO()
T.save(self.state_dict(), memory_buffer)
return memory_buffer
# The same as load_checkpoint, but retuns the data in-memory instead of to a file.
def load_params_mem(self, memory_buffer):
memory_buffer.seek(0) # Move the read/write head of the in-memory file to the start (0)
self.load_state_dict(T.load(memory_buffer))