-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgame_generation_processes.py
188 lines (152 loc) · 6.57 KB
/
game_generation_processes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import time
import logging
import numpy as np
from multiprocessing import shared_memory, Process
from multiprocessing import current_process
import torch
import multiprocessing as mp
from ia.janggi_network import JanggiNetwork
from ia.trainer import ModelSaver, run_episode_raw
from janggi.utils import BOARD_HEIGHT, BOARD_WIDTH, DEVICE
# Activate some logs, can be turned off
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
BASE_DIR = "model"
# Number of residual layers in the network
N_RESIDUAL = 20
# Number of parallel simulations
N_POOLS = 30
# Number of simulation per round
N_SIMULATIONS = 800
# Number of round per game
ITER_MAX = 200
# Number of episodes between two saving and model reload
N_EPISODES = N_POOLS
# Parameters for parallel processing
# Maximum batch size processed at once
BATCH_SIZE = 16
# Number of features for the board
N_FEATURES = 16
# Number of features to represent a policy
N_FEATURES_POLICY = 58
# Number of buffers, where waiting data can be stored
N_BUFFERS = 2
DIMS = (BATCH_SIZE * N_BUFFERS * 2 + 1 + 1, # 2 reading buffers, 2 writing buffers, 1 Info, 1 values
N_FEATURES_POLICY,
BOARD_HEIGHT,
BOARD_WIDTH)
CURRENT_INDEX = (0, 0, 0, 0)
def create_shared_block():
block = np.zeros(DIMS, dtype=np.float32)
block[CURRENT_INDEX] = 0
shm = shared_memory.SharedMemory(create=True, size=block.nbytes)
shm_block = np.ndarray(block.shape, dtype=np.float32, buffer=shm.buf)
shm_block[:] = block[:]
return shm
def send_tensor(shr_name, tensor, lock):
existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray(DIMS, dtype=np.float32, buffer=existing_shm.buf)
while True:
with lock:
current_index = int(np_array[CURRENT_INDEX])
free_status = np_array[0, current_index + 1, 0, 1]
if free_status < 0.1:
np_array[CURRENT_INDEX] = (current_index + 1) % (BATCH_SIZE * N_BUFFERS)
np_array[1 + current_index, :N_FEATURES, :, :] = tensor.numpy()
np_array[0, current_index + 1, 0, 1] = 1
break
existing_shm.close()
return current_index
def read_result(shr_name, current_index, lock):
existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray(DIMS, dtype=np.float32, buffer=existing_shm.buf)
while True:
with lock:
if np_array[0, current_index + 1, 0, 0] > 0.1:
policy = torch.tensor(np_array[1 + N_BUFFERS * BATCH_SIZE + current_index:2 + N_BUFFERS * BATCH_SIZE + current_index])
value = torch.tensor(np_array[-1, current_index:current_index + 1, 0:1, 0])
np_array[0, current_index + 1, 0, 0] = 0
np_array[0, current_index + 1, 0, 1] = 0
break
existing_shm.close()
return policy, value
def get_policy_value(model, features):
features = features.to(DEVICE)
with torch.no_grad():
policy, value = model(features)
return policy, value
def write_result(shr_name, previous_index, model, lock):
existing_shm = shared_memory.SharedMemory(name=shr_name)
np_array = np.ndarray(DIMS, dtype=np.float32, buffer=existing_shm.buf)
while True:
with lock:
current_index = int(np_array[CURRENT_INDEX])
if current_index > previous_index:
features = np_array[1 + previous_index: 1 + current_index, :N_FEATURES, :, :]
break
if current_index < previous_index:
features = np_array[np.r_[1: 1 + current_index,
1 + previous_index: 1 + BATCH_SIZE * N_BUFFERS], :N_FEATURES, :, :]
break
features = torch.tensor(features)
policy, value = get_policy_value(model, features)
with lock:
if current_index > previous_index:
np_array[1 + previous_index + N_BUFFERS * BATCH_SIZE: 1 + current_index + N_BUFFERS * BATCH_SIZE, :, :, :] = policy.cpu()
np_array[-1, previous_index:current_index, 0, 0] = value.view(-1).cpu()
np_array[0, previous_index + 1:current_index + 1, 0, 0] = 1.0
if current_index < previous_index:
np_array[np.r_[1 + N_BUFFERS * BATCH_SIZE: 1 + current_index + N_BUFFERS * BATCH_SIZE,
1 + previous_index + N_BUFFERS * BATCH_SIZE: 1 + 2 * BATCH_SIZE * N_BUFFERS],
:N_FEATURES_POLICY, :, :] = policy.cpu()
np_array[-1, np.r_[0: current_index, previous_index:BATCH_SIZE * N_BUFFERS], 0, 0] = value.view(-1).cpu()
np_array[0, np.r_[1:current_index + 1, previous_index + 1:1 + BATCH_SIZE * N_BUFFERS], 0, 0] = 1.0
existing_shm.close()
return current_index
def get_model():
model = JanggiNetwork(N_RESIDUAL)
def load_latest_model():
model_saver_temp = ModelSaver(BASE_DIR)
model_saver_temp.load_latest_model(model)
load_latest_model()
model.to(DEVICE)
model.eval()
return model
def predictor_loop(shr_name, lock):
current_index = 0
model = get_model()
while True:
current_index = write_result(shr_name, current_index, model, lock)
class ProcessPredictor:
def __init__(self, shr_name, lock):
self.shr_name = shr_name
self.lock = lock
def __call__(self, features):
current_index = send_tensor(self.shr_name, features, self.lock)
result = read_result(self.shr_name, current_index, self.lock)
return result
if __name__ == "__main__":
if current_process().name == "MainProcess":
mp.set_start_method("spawn", force=True)
while True:
with mp.Manager() as manager:
print("Creating shared block")
shr = create_shared_block()
lock = manager.Lock()
predictor = ProcessPredictor(shr.name, lock)
model_saver = ModelSaver(BASE_DIR)
print("Start Predictor Process")
predictor_process = Process(target=predictor_loop, args=(shr.name, lock))
predictor_process.start()
time.sleep(5)
begin_time = time.time()
with mp.Pool(N_POOLS) as pool:
episodes = pool.map(run_episode_raw,
[(predictor, N_SIMULATIONS, ITER_MAX) for _ in range(N_EPISODES)])
model_saver.save_episodes_raw(episodes)
print("Total time:", time.time() - begin_time)
predictor_process.terminate()
predictor_process.join()
predictor_process.close()
shr.close()
shr.unlink()