-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtest_autograd.py
executable file
·124 lines (110 loc) · 3.83 KB
/
test_autograd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python
import sys
import math
import torch
import numpy as np
from utils.data_loader import *
from utils.data_loader import *
from utils.metrics import *
# fix random seed
np.random.seed(0)
class LabelProp(object):
def __init__(self, data):
self.data_name = data
if data == 'cadata':
self.features, self.labels = load_cadata()
self.metric = RMSE
self.task = 'regression'
elif data == 'mnist':
self.features, self.labels = load_mnist()
self.metric = accuracy
self.task = 'classification'
def set_train_num(self, train_num):
self.train_num = train_num
return self
def set_hparam(self, gamma):
self.gamma = gamma
return self
def shuffle_data(self):
# shuffle before split data
n_data = self.features.shape[0]
shuffle_idx = np.random.permutation(n_data)
# do two inplace shuffle operations
self.features = np.take(self.features, shuffle_idx, axis=0)
self.labels = np.take(self.labels, shuffle_idx, axis=0)
# convert to pytorch array
self.features = torch.from_numpy(self.features).float().cuda()
self.labels = torch.from_numpy(self.labels).float().cuda()
def split_data(self):
n_train = self.train_num
# split data
train_features = self.features[:n_train]
test_features = self.features[n_train:]
train_labels = self.labels[:n_train]
test_labels = self.labels[n_train:]
self.X_tr, self.X_te, self.y_tr, self.y_te = \
train_features, test_features, train_labels, test_labels
@classmethod
def similarity_matrix(cls, X, gamma):
tmp = X @ torch.transpose(X, 0, 1)
n_data = X.size(0)
diag = torch.diag(tmp)
S = gamma * (2 * tmp - diag.view(1, n_data) - diag.view(n_data, 1))
return torch.exp(S)
@classmethod
def diagnoal(cls, similarity_matrix):
D = torch.diag(torch.sum(similarity_matrix, dim=1, keepdim=False))
return D
def l2_loss(self, delta_x):
LP = LabelProp
n_tr = self.train_num
# perturb X
X_ = self.features + delta_x
S = LP.similarity_matrix(X_, self.gamma)
D = LP.diagnoal(S)
Suu = S[n_tr:, n_tr:]
Duu = D[n_tr:, n_tr:]
Sul = S[n_tr:, :n_tr]
y_tr = self.y_tr
y_te = self.y_te
tmp = torch.mm(torch.inverse(Duu - Suu), Sul)
y_pred = torch.mv(tmp, y_tr)
diff = y_pred - y_te
return -0.5 * torch.sum(diff * diff)
def perturb_x_regression(self, d_max):
"""Find the optimal L2 constraint perturbation by
projected gradient descent.
"""
self.split_data()
delta_x = torch.zeros_like(self.features, requires_grad=True).cuda()
lr = 1.0e1
while True:
# calculate the L2 loss
loss = self.l2_loss(delta_x)
print(f'Loss: {loss.item()}')
# backward to calculate the gradient
g = torch.autograd.grad(loss, [delta_x])[0]
# gradient descent
delta_x.detach().add_(-lr * g)
# project to l2-ball with radius d_max
norm_delta_x = torch.norm(delta_x)
if norm_delta_x > d_max:
# project back
delta_x.detach().mul_(d_max / norm_delta_x)
return delta_x
def X_sensitivity_cadata(lp):
lp.set_train_num(1000)
X = lp.features
mean_norm = torch.norm(X)
d_max = 0.1 * mean_norm
print(d_max)
lp.set_hparam(.2)
lp.perturb_x_regression(d_max)
if __name__ == "__main__":
data ="cadata"
# for cpusmall data, gamma=20 (data unnormalized!)
# for cadata, gamma=1
# for mnist gamma=0.6
lp = LabelProp(data)
lp.shuffle_data()
X_sensitivity_cadata(lp)