-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathVFA_Net.py
More file actions
229 lines (182 loc) · 8.6 KB
/
VFA_Net.py
File metadata and controls
229 lines (182 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import numpy as np
from collections import defaultdict
class NeuralNetwork:
def __init__(self, nn_structure, bias=True, double=False, zero=False, seed=None, initVar = 1, initVarLast = 1):
self.nn_structure = nn_structure
self.num_layers = len(nn_structure)
self.parameters = {}
self.bias = bias
self.double = double
self.zero = zero
# Variance of all layers' parameters minus last
self.initVar = initVar
# Variance of last layer parameters
self.initVarLast = initVarLast
# intializes dictionaries needed to store values for backpropagation
self.memory = {}
self.grad_values = {}
# TODO: add if statement to catch when double and zero are both true
if seed is not None:
np.random.seed(seed)
# loop through neural net
for idx, layer in enumerate(self.nn_structure):
layer_input_size = layer["input_dim"]
layer_output_size = layer["output_dim"]
if self.bias:
self.parameters['b_' + str(idx)] = np.random.randn(layer_output_size, 1)
else:
self.parameters['b_' + str(idx)] = np.zeros((layer_output_size,1))
self.parameters['w_' + str(idx)] = np.random.normal(0, self.initVarLast/layer_input_size, (layer_output_size, layer_input_size))
if self.double and idx == self.num_layers-1:
if layer_input_size%2 != 0:
raise Exception('Odd number of layers in the last layer, must be even to use doubling trick')
# TODO: change so it doubles every hidden layer to ensure initialization to 0 (?)
for i in range(layer_output_size):
halfArray = np.random.normal(0, self.initVarLast/layer_input_size, int(layer_input_size/2))
self.parameters['w_' + str(idx)][i] = np.concatenate((halfArray,np.negative(halfArray)))
# sets weights of last layer to 0
elif self.zero and idx == self.num_layers-1:
self.parameters['w_' + str(idx)] = np.zeros((layer_output_size,layer_input_size))
def __call__(self, a0):
# TODO: add assertion to confirm shape of input
a_prev = a0
for idx, layer in enumerate(self.nn_structure):
w_n = self.parameters['w_' + str(idx)]
b_n = self.parameters['b_' + str(idx)]
a_n, z_n = self.layer_activation(a_prev, w_n, b_n, layer['activation'])
a_prev = a_n
return a_n
def layer_activation(self, a_prev, w, b, activation = 'relu'):
# function computes the process that occurs in a single layer
# returns the activation value and the z value, both are needed for the gradient
z = np.matmul(w,a_prev) + b
if activation == 'none':
return z, z
elif activation == 'relu':
return self.relu(z), z
elif activation == 'sigmoid':
return self.sigmoid(z), z
elif activation == 'tanh':
return self.tanh(z), z
elif activation == 'leakyRelu':
return self.leakyRelu(z), z
elif activation == 'quadratic':
return self.quadratic(z), z
else:
raise Exception('activation function currently not supported')
def net_forward(self, a0):
self.input_batch = a0
a_prev = a0
for idx, layer in enumerate(self.nn_structure):
w_n = self.parameters['w_' + str(idx)]
b_n = self.parameters['b_' + str(idx)]
a_n, z_n = self.layer_activation(a_prev, w_n, b_n, layer['activation'])
a_prev = a_n
self.memory['a_' + str(idx)] = a_n
self.memory['z_' + str(idx)] = z_n
return a_n
def gradient_backward(self, a_prev, w_n, z_n, dA, activation = 'relu'):
if activation == 'none':
dZ = dA
elif activation == 'relu':
dZ = dA * self.drelu(z_n)
elif activation == 'sigmoid':
dZ = dA * self.dsigmoid(z_n)
elif activation == 'tanh':
dZ = dA * self.dtanh(z_n)
elif activation == 'leakyRelu':
dZ = dA * self.dleakyRelu(z_n)
elif activation == 'quadratic':
dZ = dA * self.dquadratic(z_n)
else:
raise Exception('activation function currently not supported')
dA_prev = np.matmul(w_n.T, dZ)
dW = np.matmul(dZ, a_prev.T)
if self.bias:
dB = dZ
else:
dB = np.zeros(dZ.shape)
return dA_prev, dW, dB
def net_backward(self, dA, alpha=1):
"""
dA is derivative of cost w.r.t output
"""
for idx, layer in reversed(list(enumerate(self.nn_structure))):
if idx == 0:
a_prev = self.input_batch
else:
a_prev = self.memory['a_' + str(idx - 1)]
z_n = self.memory['z_' + str(idx)]
w_n = self.parameters['w_' + str(idx)]
dA_prev, dW, dB = self.gradient_backward(a_prev, w_n, z_n, dA, layer['activation'])
dA = dA_prev
self.grad_values['dW_' + str(idx)] = dW
self.grad_values['dB_' + str(idx)] = dB
return self.grad_values
def update_wb(self, step_size):
for idx, layer in enumerate(self.nn_structure):
self.parameters['w_' + str(idx)] -= step_size*self.grad_values['dW_' + str(idx)]
self.parameters['b_' + str(idx)] -= step_size*self.grad_values['dB_' + str(idx)]
return
def batch_update_wb(self, step_size, grad_values):
temp = defaultdict(list)
for i in range(len(grad_values)):
for idx, _ in enumerate(self.nn_structure):
temp['dW_'+str(idx)].append(grad_values[i]['dW_'+str(idx)])
temp['dB_'+str(idx)].append(grad_values[i]['dB_'+str(idx)])
for idx, layer in enumerate(self.nn_structure):
self.parameters['w_' + str(idx)] -= step_size*np.mean(temp['dW_' + str(idx)], axis=0)
self.parameters['b_' + str(idx)] -= step_size*np.mean(temp['dB_' + str(idx)], axis=0)
return
def save_model(self, name):
np.save(name, self.parameters)
return
def load_model(self, name):
npfile = np.load('{}.npy'.format(name), allow_pickle=True).item()
for idx, layer in enumerate(self.nn_structure):
self.parameters['w_' + str(idx)] = npfile['w_' + str(idx)]
for idx, layer in enumerate(self.nn_structure):
self.parameters['b_' + str(idx)] = npfile['b_' + str(idx)]
return
def reset(self, seed=None):
if seed is not None:
np.random.seed(seed)
for idx, layer in enumerate(self.nn_structure):
layer_input_size = layer["input_dim"]
layer_output_size = layer["output_dim"]
if self.bias:
self.parameters['b_' + str(idx)] = np.random.randn(layer_output_size, 1)
else:
self.parameters['b_' + str(idx)] = np.zeros((layer_output_size,1))
self.parameters['w_' + str(idx)] = np.random.normal(0, self.initVar/layer_input_size, (layer_output_size, layer_input_size))
if self.double and idx == self.num_layers-1:
if layer_input_size%2 != 0:
raise Exception('Odd number of layers in the last layer, must be even to use doubling trick')
# sets weights of last layer to 0
if self.initVarLast != 0:
for i in range(layer_output_size):
halfArray = np.random.normal(0, self.initVarLast/layer_input_size, int(layer_input_size/2))
self.parameters['w_' + str(idx)][i] = np.concatenate((halfArray,np.negative(halfArray)))
else:
self.parameters['w_' + str(idx)] = np.zeros((layer_output_size,layer_input_size))
# activation functions
def sigmoid(self, x):
return 1/(1 + np.exp(-x))
def dsigmoid(self, x):
return np.multiply(self.sigmoid(x), np.ones(x.shape) - self.sigmoid(x))
def relu(self, x):
return np.maximum(0,x)
def drelu(self, x):
return (x > 0).astype(int)
def tanh(self, x):
return (np.exp(x) - np.exp(-x))/(np.exp(x) + np.exp(-x))
def dtanh(self, x):
return 1 - self.tanh(x)**2
def leakyRelu(self, x, a=0.2):
return np.maximum(a*x, x)
def dleakyRelu(self, x, a=0.2):
return (x > 0).astype(int) - a*(x < 0).astype(int)
def quadratic(self, x):
return np.square(x)
def dquadratic(self, x):
return 2*x