-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_nn.py
329 lines (254 loc) · 12.1 KB
/
simple_nn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
import random
import time
import numpy as np
class Network(object):
""" Create a feed-forward neural network.
Example: Network([784, 30, 10], 'sigmoid') creates a network with
input dimension 784, a hidden layer of dimension 30, and outout
layer of dimension 10 with sigmoid activations.
"""
def __init__(self, sizes: list, activation: str):
"""
sizes: list of input, hidden and output dimensions
activation: choice for the activation function
"""
self.sizes = sizes
self.initialize_weights_biases()
if activation == 'sigmoid':
self.activation = sigmoid
self.activation_deriv = sigmoid_deriv
self.activation_output_layer = sigmoid
self.cost_fun = cross_entropy_cost
elif activation == 'relu':
self.activation = relu
self.activation_deriv = relu_deriv
self.activation_output_layer = softmax
self.cost_fun = log_likelihood_cost
self.weights = [w * 0.001 for w in self.weights]
self.biases = [b * 0.001 for b in self.biases]
else:
raise ValueError("Unknown activation function: " + repr(activation) +
", use 'sigmoid' or 'relu'.")
self.accuracies_test = []
self.accuracies_train = []
def initialize_weights_biases(self):
"""Initialize the starting weights and biases for the Network."""
self.weights = [np.random.randn(y, x)/np.sqrt(x)
for x, y in zip(self.sizes[:-1], self.sizes[1:])]
self.biases = [np.random.randn(y, 1) for y in self.sizes[1:]]
def feedforward(self, a):
"""Calculate the prediction of the network for input 'a'."""
for b, w in zip(self.biases[:-1], self.weights[:-1]):
z = np.dot(w, a) + b
a = self.activation(z)
b, w = self.biases[-1], self.weights[-1]
z = np.dot(w, a) + b
a = self.activation_output_layer(z)
return a
def SGD(self, training_data, test_data, epochs, mini_batch_size,
eta, lmbda=0, dropout_rate=0, monitor_accuracy=True):
"""Train the neural network using mini-batch stochastic
gradient descent. The 'training_data' is a list of tuples
(x, y) representing the training inputs and labels. (See
documentation in load_mist.py for details of the data format.)
Eta is the learning rate, lmbda the parameter for L2 regularization.
If monitor_accuracy == True the network will be evaluated
against the test and training data after each epoch. Partial
progress is printed out and saved.
"""
n = len(training_data)
for j in range(epochs):
start_time = time.time()
random.shuffle(training_data)
mini_batches = [ training_data[k:k+mini_batch_size]
for k in range(0, n, mini_batch_size)]
for batch in mini_batches:
# X and Y are matrices where each column is a single
# input x or label y. So
# X.shape == (784, mini_batch_size)
# Y.shape == (10 , mini_batch_size)
X = np.column_stack( tuple(x for (x,_) in batch) )
Y = np.column_stack( tuple(y for (_,y) in batch) )
nabla_b, nabla_w = self.backprop(X, Y, dropout_rate,
mini_batch_size)
self.weights = [(1 - eta * lmbda / n ) * w - eta * nw
for w, nw in zip(self.weights, nabla_w)]
self.biases = [b - eta * nb
for b, nb in zip(self.biases, nabla_b)]
self.progress_and_monitoring(training_data, test_data, epochs,
monitor_accuracy, start_time, j+1)
def SGD_adam(self, training_data, test_data, epochs, mini_batch_size,
alpha, lmbda=0, dropout_rate=0, monitor_accuracy=True):
"""A variation of the SDG method using the Adam optimization
algorihthm as desrcibed in arXiv:1412.6980v9. The learning
rate is renamed to alpha, the other parameters are as in the
SGD method.
"""
n = len(training_data)
# parameters for the algorithm as suggested in the original paper
beta1 = 0.9
beta2 = 0.999
epsilon = 10e-8
print('Starting with training')
ms_w = [np.zeros(w.shape) for w in self.weights]
vs_w = [np.zeros(w.shape) for w in self.weights]
ms_b = [np.zeros(b.shape) for b in self.biases]
vs_b = [np.zeros(b.shape) for b in self.biases]
for j in range(epochs):
start_time = time.time()
random.shuffle(training_data)
mini_batches = [ training_data[k:k+mini_batch_size]
for k in range(0, n, mini_batch_size)]
for batch in mini_batches:
X = np.column_stack( tuple(x for (x,_) in batch) )
Y = np.column_stack( tuple(y for (_,y) in batch) )
nabla_b, nabla_w = self.backprop(X, Y, dropout_rate,
mini_batch_size)
ms_w = [beta1 * mw + (1-beta1)*nw for mw, nw in zip(ms_w, nabla_w)]
ms_b = [beta1 * mb + (1-beta1)*nb for mb, nb in zip(ms_b, nabla_b)]
vs_w = [beta2 * vw + (1-beta2) * nw * nw
for vw, nw in zip(vs_w, nabla_w)]
vs_b = [beta2 * vb + (1-beta2) * nb * nb
for vb, nb in zip(vs_b, nabla_b)]
#bias correction for the first iterations of Adam not implemented
self.weights = [
(1 - alpha * lmbda/n)*w - alpha * mw / (np.sqrt(vw) + epsilon)
for w, mw, vw in zip(self.weights, ms_w, vs_w)]
self.biases = [b - alpha * mb / (np.sqrt(vb) + epsilon)
for b, mb,vb in zip(self.biases, ms_b, vs_b)]
self.progress_and_monitoring(training_data, test_data, epochs,
monitor_accuracy, start_time, j+1)
def backprop(self, X, Y, dropout_rate, mini_batch_size):
"""Returns a tuple (nabla_b, nabla_w) representing the
gradient for the cost function. nabla_b and nabla_w are
layer-by-layer lists of numpy arrays, similar to self.biases
and self.weights.
"""
# -- Forward pass --
As = [] # list to store all the activations, layer by layer
Zs = [] # list to store all the Z matrices, layer by layer
A = X
As.append(X)
p = 1 - dropout_rate # keep probability
for w, b in zip(self.weights[:-1], self.biases[:-1]):
Z = np.dot(w, A) + b # broadcasting takes care of bs dimensions
A = self.activation(Z)
if dropout_rate:
mask = np.random.rand(*Z.shape) < p
A *= mask / p # rescale here, not at prediction time
Zs.append(Z)
As.append(A)
w, b = self.weights[-1], self.biases[-1]
Z = np.dot(w, A) + b
A = np.apply_along_axis(self.activation_output_layer,0,Z)
# apply_along_axis because softmax needs to be applied per column
Zs.append(Z)
As.append(A)
# -- Backward pass --
num_layers = len(self.sizes)
nabla_w = [None] * (num_layers - 1)
nabla_b = [None] * (num_layers - 1)
m = mini_batch_size
# For a sigmoid output layer with cross entropy or a softmax output layer
# with log likelihood loss the first step of backprop is identical.
Delta = (As[-1] - Y)
nabla_b[-1] = (1/m) * np.sum(Delta, axis=1, keepdims=True) # !
nabla_w[-1] = (1/m) * np.dot(Delta, As[-2].transpose()) # !
# For nabla_w the sum of the dot product is over the training
# examples in the mini_batch.
# nabla_w[-1][j][k] = sum( Delta[j][:] * A[k][:] )
# The loop is over the layers of the network, l = 1 means
# the last layer, l = 2 is the second-last layer, and so on.
for l in range(2, num_layers):
Z = Zs[-l]
Ad = self.activation_deriv(Z)
Delta = np.dot(self.weights[-l+1].transpose(), Delta) * Ad
nabla_b[-l] = (1/m) * np.sum(Delta, axis=1, keepdims=True) # !
nabla_w[-l] = (1/m) * np.dot(Delta, As[-l-1].transpose()) # !
return (nabla_b, nabla_w)
def progress_and_monitoring(self, training_data, test_data, epochs,
monitor_accuracy, start_time, current_epoch):
"Print epoch, time and accuracy information."
n = len(training_data)
n_test = len(test_data)
def progress_str (time):
k = int(np.log(epochs)/np.log(10)) + 1
return "Epoch {0:{pad}d}/{1:{pad}d} ({2:.1f}s)".format(
current_epoch, epochs, time-start_time, pad=k)
if monitor_accuracy:
accuracy_test = self.evaluate(test_data) / n_test * 100
self.accuracies_test.append(accuracy_test)
# Vectorization over a batch is just implemented for the
# training method not for prediction. For speed, the
# accuracy calculation is just done on a part of the training data.
random.shuffle(training_data)
training_data_part = training_data[:n_test]
accuracy_train = self.evaluate(training_data_part) / n_test * 100
self.accuracies_train.append(accuracy_train)
accuracy_str = ", train / test accuracy: {0:.1f}% / {1:.1f}%".format(
accuracy_train, accuracy_test)
t = time.time()
print(progress_str(t) + accuracy_str, end='\n')
else:
t = time.time()
print(progress_str(t), end='\r')
if current_epoch == epochs:
print("\n"+
"Accuracy on training and test data: {0:.1f}% / {1:.1f}%".format(
self.evaluate(training_data) / n * 100,
self.evaluate(test_data) / n_test * 100)
)
def evaluate(self, eval_data):
"""Return the number of cases in the eval_data for which
the neural network predicts the correct result.
"""
t = data_type(eval_data)
if t == 'training':
results = [ (np.argmax(self.feedforward(x)), np.argmax(y))
for (x, y) in eval_data ]
elif t == 'test':
results = [ (np.argmax(self.feedforward(x)), y)
for (x, y) in eval_data ]
else:
raise TypeError('Data to evaluate the networks accuracy' +
'has wrong format.')
correct_predictions = sum( [int(x == y) for (x, y) in results] )
return correct_predictions
def evaluate_cost(self, eval_data):
"""Calculate the total cost of eval_data.
Caution! Just works if data_type(eval_data) == 'training'.
"""
costs = [self.cost_fun(self.feedforward(x), y)
for x, y in eval_data]
return sum(costs)
def data_type(data_set):
""" Training data and test data have the output labels stored in a
different format. Either as a one-hot vector or as an
interger. This function detects which format is used by its input data_set."""
first_img, first_label = data_set[0]
if type(first_label) == np.ndarray:
return 'training'
elif type(first_label) == np.ubyte:
return 'test'
else:
return 'unknown'
def sigmoid(z):
return 1.0 / (1.0 + np.exp(-z))
def sigmoid_deriv(z):
return sigmoid(z) * (1-sigmoid(z))
def cross_entropy_cost(a,y):
c = -y * np.log(a) - (1-y) * np.log(1-a)
return np.sum(np.nan_to_num(c))
# np.nan_to_num(np.log(0)*0) == 0.0
def relu (z):
return (abs(z) + z) / 2
def relu_deriv (z):
return (np.sign(z) + 1) / 2
def softmax(z):
x = np.exp(z)
s = np.sum(x)
return x/s
def log_likelihood_cost(a,y):
c = -y * np.log(a)
return np.sum(np.nan_to_num(c))
# np.nan_to_num(np.log(0)*0) == 0.0