-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathrnn_rbm.py
More file actions
298 lines (191 loc) · 8.69 KB
/
rnn_rbm.py
File metadata and controls
298 lines (191 loc) · 8.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# http://github.com/timestocome
# convert example rnn-rbm from Lisa Labs that learns and generates music
# to do so with text from Alice in Wonderland
# started with this code:
# Author: Nicolas Boulanger-Lewandowski
# University of Montreal (2012)
# RNN-RBM deep learning tutorial
# More information at http://deeplearning.net/tutorial/rnnrbm.html
#import glob
import os
import sys
import pickle
import numpy as np
import pylab
import theano
import theano.tensor as T
from theano.sandbox.rng_mrg import MRG_RandomStreams as RandomStreams
# set up
np.random.seed(27)
rng = RandomStreams(seed=np.random.randint(1 << 30))
# setup theano
GPU = True
if GPU:
print("Device set to GPU")
try: theano.config.device = 'gpu'
except: pass # its already set
theano.config.floatX = 'float32'
else:
print("Running with CPU")
# constants
n_chain = 15 # gibbs chain length
n_rnn = 64 # rnn nodes
n_rbm = 64 # rbm nodes
n_epochs = 100 # times to loop over data
#####################################################################################
# load data
# chars are 1-49 inclusive
# data is broken into phrases, shortest is 2 chars, longest is 319 chars
#####################################################################################
# read in char and word dictionarys and set up functions to convert ints to words
working_dir = os.getcwd()
data_directory = os.getcwd() + '/char_sentences'
os.chdir(data_directory) # move to data file dir
file_list = os.listdir(os.getcwd()) # list data files
file_list.pop(0) # remove (%$#^% .DS_Store
n_samples = len(file_list)
batch_size = 50 # highest and lowest possible values ( # of chars )
# 49 uniques but we start at 1 so add 1
# read in files
x = []
for f in file_list:
file_in = np.asarray(np.load(f), dtype=theano.config.floatX)
x.append(file_in)
# convert each input vector into a matrix of one hot vectors
dataset = []
for i in x: # read in a phrase
phrase = []
for j in i: # for each char in phrase
z = np.zeros(batch_size)
z[j] = 1.
phrase.append(z)
dataset.append(phrase)
# return to our project home dir
os.chdir(working_dir)
print("data loaded", len(x))
###### utilities #############
# convert ints back into chars
char_dict = pickle.load(open('char_index_dictionary.pkl', 'rb'))
def chars_to_string(chars):
translation = [char_dict.get(k) for k in chars]
text = []
for i in translation:
if i == None:
text.append('_')
else:
text.append(i[0])
text = np.asarray(text).flatten()
print( text.astype('|S1').tostring().decode('utf-8') )
##########################################################################################
# RBM
##########################################################################################
# Construct a k-step Gibbs chain starting at v for an RBM.
#
# k length of Gibbs chain
def build_rbm(v, W, bv, bh, k=n_chain):
# obtain an approximate sequence from a probability distribution
def gibbs_step(v):
# hidden nodes
mean_h = T.nnet.sigmoid(T.dot(v, W) + bh)
h = rng.binomial(size=mean_h.shape, n=1, p=mean_h, dtype=theano.config.floatX)
# visible/input nodes
mean_v = T.nnet.sigmoid(T.dot(h, W.T) + bv)
v = rng.binomial(size=mean_v.shape, n=1, p=mean_v, dtype=theano.config.floatX)
return mean_v, v
# create the sequence from the gibbs steps
chain, updates = theano.scan(lambda v: gibbs_step(v)[1], outputs_info=[v], n_steps=k)
v_sample = chain[-1]
mean_v = gibbs_step(v_sample)[0]
monitor = T.xlogx.xlogy0(v, mean_v) + T.xlogx.xlogy0(1 - v, 1 - mean_v)
monitor = monitor.sum() / v.shape[0]
# cost
def free_energy(v):
return -(v * bv).sum() - T.log(1 + T.exp(T.dot(v, W) + bh)).sum()
cost = (free_energy(v) - free_energy(v_sample)) / v.shape[0]
return v_sample, cost, monitor, updates
# Utility to initialize a matrix shared variable with normally distribution
def shared_normal(num_rows, num_cols, scale=1):
return theano.shared(np.random.normal(
scale=scale, size=(num_rows, num_cols)).astype(theano.config.floatX))
# Utility to initialize a vector shared variable with zero elements
def shared_zeros(*shape):
return theano.shared(np.zeros(shape, dtype=theano.config.floatX))
# visible is possible chars (50 in this case), n_hidden is rbm, n_hidden_recurrent is rnn
def build_rnnrbm(n_visible, n_hidden, n_hidden_recurrent):
W = shared_normal(n_visible, n_hidden, 0.01)
bv = shared_zeros(n_visible)
bh = shared_zeros(n_hidden)
Wuh = shared_normal(n_hidden_recurrent, n_hidden, 0.0001)
Wuv = shared_normal(n_hidden_recurrent, n_visible, 0.0001)
Wvu = shared_normal(n_visible, n_hidden_recurrent, 0.0001)
Wuu = shared_normal(n_hidden_recurrent, n_hidden_recurrent, 0.0001)
bu = shared_zeros(n_hidden_recurrent)
params = W, bv, bh, Wuh, Wuv, Wvu, Wuu, bu # learned parameters as shared variables
v = T.matrix() # a training sequence
u0 = T.zeros((n_hidden_recurrent,)) # initial value for the RNN hidden units
# calculate values for hidden and visible nodes
def recurrence(v_t, u_tm1):
bv_t = bv + T.dot(u_tm1, Wuv)
bh_t = bh + T.dot(u_tm1, Wuh)
generate = v_t is None
if generate:
v_t, _, _, updates = build_rbm(T.zeros((n_visible,)), W, bv_t, bh_t, k=25)
u_t = T.tanh(bu + T.dot(v_t, Wvu) + T.dot(u_tm1, Wuu))
return ([v_t, u_t], updates) if generate else [u_t, bv_t, bh_t]
# loop over recurrence
(u_t, bv_t, bh_t), updates_train = theano.scan(
lambda v_t, u_tm1, *_: recurrence(v_t, u_tm1),
sequences=v, outputs_info=[u0, None, None], non_sequences=params)
v_sample, cost, monitor, updates_rbm = build_rbm(v, W, bv_t[:], bh_t[:], k=15)
updates_train.update(updates_rbm)
# symbolic loop for sequence generation
(v_t, u_t), updates_generate = theano.scan(
lambda u_tm1, *_: recurrence(None, u_tm1),
outputs_info=[None, u0], non_sequences=params, n_steps=200)
return (v, v_sample, cost, monitor, params, updates_train, v_t, updates_generate)
# Simple class to train an RNN-RBM and generate sample sequences.
class RnnRbm:
# n_hidden RBM hidden units
# n_hidden_recurrent RNN hidden units
# Is r lowest value (0, 50) 50 uniques in input data
def __init__( self, n_hidden=n_rbm, n_hidden_recurrent=n_rnn, lr=0.001, r=(0, 50) ):
self.r = r
(v, v_sample, cost, monitor, params, updates_train, v_t, updates_generate) = build_rnnrbm( r[1] - r[0], n_hidden, n_hidden_recurrent)
gradient = T.grad(cost, params, consider_constant=[v_sample])
updates_train.update( ((p, p - lr * g) for p, g in zip(params, gradient)))
self.train_function = theano.function( [v], monitor, updates=updates_train)
self.generate_function = theano.function( [], v_t, updates=updates_generate)
# loop over full data set once per epic
def train(self, input, batch_size=batch_size, num_epochs=n_epochs):
dataset = input
try:
for epoch in range(num_epochs):
np.random.shuffle(dataset)
costs = []
for s, sequence in enumerate(dataset): # for each input vector
for i in range(0, len(sequence), batch_size): # 0...sequence length, batch_size = step
cost = self.train_function(sequence[i:i + batch_size])
costs.append(cost)
print('Epoch %i/%i Cost %.2f' % (epoch + 1, num_epochs, np.mean(costs)))
self.generate()
print("*********************************************************************")
sys.stdout.flush()
except KeyboardInterrupt:
print('Interrupted by user.')
def generate(self):
sample = np.array(self.generate_function().astype(int))
# convert sample to ints
ints = []
for i in sample:
ints.append(i.argmax())
# convert sample to chars
text = chars_to_string(ints)
return text
def test_rnnrbm(batch_size=batch_size, num_epochs=n_epochs):
model = RnnRbm()
model.train(dataset, batch_size=batch_size, num_epochs=num_epochs)
return model
##########################################################################
# train network
################################################################
model = test_rnnrbm()