-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlvq.py
109 lines (88 loc) · 4.16 KB
/
lvq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import theano
import numpy
from blocks.bricks import Initializable
from blocks.utils import shared_floatx_nans
from blocks.bricks.base import application
import theano.tensor as tensor
class LVQ(Initializable):
def __init__(self, n_classes, dim, nonlin=True, gamma=2.0, **kwargs):
super(LVQ, self).__init__(**kwargs)
self.n_classes = n_classes
self.dim = dim
self.gamma = gamma
self.nonlin=nonlin
def _allocate(self):
W = shared_floatx_nans((self.n_classes, self.dim), name='prototypes')
self.parameters.append(W)
def _initialize(self):
W, = self.parameters
self.weights_init.initialize(W, self.rng)
@application(inputs=['x', 'y', 'prefix'], outputs=['cost', 'misclass'])
def apply(self, x, y, prefix):
W, = self.parameters
#Create mask from labels
mask = tensor.alloc(0, y.shape[0]*self.n_classes)
ind = tensor.arange(y.shape[0])*self.n_classes + y
mask = tensor.set_subtensor(mask[ind], 1.0)
mask = tensor.reshape(mask, (y.shape[0], self.n_classes))
#Compute distance matrix
D = ((W**2).sum(axis=1, keepdims=True).T + (x**2).sum(axis=1, keepdims=True) - 2*tensor.dot(x, W.T))
self.add_auxiliary_variable(D, name=prefix+'_D')
d_correct = (D + (1-mask)*numpy.float32(2e30)).min(axis=1)
d_incorrect = (D + mask*numpy.float32(2e30)).min(axis=1)
c = (d_correct - d_incorrect)
self.add_auxiliary_variable(c, name=prefix+'_cost')
if self.nonlin:
c = tensor.exp(self.gamma*c)
cost = c.mean()
misclass = (tensor.switch(d_correct - d_incorrect < 0, 0.0, 1.0)).mean()
return cost, misclass
class SupervisedNG(Initializable):
def __init__(self, n_classes, dim, nonlin=True, gamma=2.0, lamb=1.0, **kwargs):
super(SupervisedNG, self).__init__(**kwargs)
self.n_classes = n_classes
self.dim = dim
self.gamma = gamma
self.nonlin=nonlin
self.lamb = lamb
self.weighting = tensor.exp(-lamb*numpy.arange(n_classes-1)) / tensor.exp(-lamb*numpy.arange(n_classes-1)).sum()
def _allocate(self):
W = shared_floatx_nans((self.n_classes, self.dim), name='prototypes')
self.parameters.append(W)
def _initialize(self):
W, = self.parameters
self.weights_init.initialize(W, self.rng)
@application(inputs=['x', 'y', 'prefix'], outputs=['cost', 'misclass'])
def apply(self, x, y, prefix):
W, = self.parameters
mask = tensor.alloc(0, y.shape[0]*self.n_classes)
ind = tensor.arange(y.shape[0])*self.n_classes + y
mask = tensor.set_subtensor(mask[ind], 1.0)
mask = tensor.reshape(mask, (y.shape[0], self.n_classes))
#Compute distance matrix
D = ((W**2).sum(axis=1, keepdims=True).T + (x**2).sum(axis=1, keepdims=True) - 2*tensor.dot(x, W.T))
self.add_auxiliary_variable(D, name=prefix+'_D')
d_correct = tensor.reshape(D[mask.nonzero()], (y.shape[0], 1))
d_incorrect = tensor.reshape(D[(1.0-mask).nonzero()], (y.shape[0], self.n_classes-1))
c = (d_correct - d_incorrect)/(d_correct + d_incorrect)
c_sorted = tensor.sort(c, axis=1)[:, ::-1]
c = (self.weighting*c_sorted).sum(axis=1, keepdims=True)
self.add_auxiliary_variable(c, name=prefix+'_cost')
if self.nonlin:
c = tensor.exp(self.gamma*c)
cost = c.mean()
misclass = (tensor.switch(c_sorted[:, 0] < 0, 0.0, 1.0)).mean()
return cost, misclass
"""
Initialize prototypes as class conditional means
"""
def initialize_prototypes(brick, x, embedding, data_stream, key='targets'):
protos = numpy.zeros((brick.n_classes, brick.dim)).astype('float32')
n_examples = numpy.zeros((brick.n_classes,1))
f_emb = theano.function([x], embedding)
for data in data_stream.get_epoch_iterator(as_dict=True):
emb = f_emb(data['features'])
for h, y in zip(emb, data[key]):
protos[y, :] += h
n_examples[y, :] += 1
brick.parameters[0].set_value((protos/n_examples).astype('float32'))