forked from Brown-SciML/pbj
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodel.py
151 lines (128 loc) · 6.42 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torchvision.models import resnet34, resnet50
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
class ConvNet(nn.Module):
# Model architecture code from: https://github.com/y0ast/deterministic-uncertainty-quantification/blob/master/utils/cnn_duq.py
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 64, 3, padding=1)
self.bn1 = nn.BatchNorm2d(64)
self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
self.bn2 = nn.BatchNorm2d(128)
self.conv3 = nn.Conv2d(128, 128, 3)
self.bn3 = nn.BatchNorm2d(128)
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.bn2(self.conv2(x)))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.bn3(self.conv3(x)))
x = F.max_pool2d(x, 2, 2)
x = x.flatten(1)
return x
class ResNet34(nn.Module):
def __init__(self):
super().__init__()
self.ResNet34 = resnet34(weights='ResNet34_Weights.DEFAULT')
self.ResNet34 = torch.nn.Sequential(*(list(self.ResNet34.children())[:-1]))
def forward(self, x):
x = self.ResNet34(x)
return torch.flatten(x, 1)
class ResNet50(nn.Module):
def __init__(self):
super().__init__()
self.ResNet50 = resnet50(weights='ResNet50_Weights.DEFAULT')
self.ResNet50 = torch.nn.Sequential(*(list(self.ResNet50.children())[:-1]))
def forward(self, x):
x = self.ResNet50(x)
return torch.flatten(x, 1)
class FCNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(2, 256)
def forward(self, x):
return F.relu(self.fc1(x))
class DistNet(nn.Module):
def __init__(self, latent_dim=512, num_classes=10, init_weight=100, regular=False):
super().__init__()
if regular:
self.fc1 = nn.Linear(latent_dim, num_classes, bias=False)
else:
self.fc1 = nn.Linear(num_classes, num_classes, bias=False)
init_w = torch.eye(num_classes)*(num_classes/(num_classes-1))*init_weight-(1/(num_classes-1))*init_weight
self.fc1.weight = nn.Parameter(init_w)
def forward(self, x):
return self.fc1(x)
class PredictionNet(nn.Module):
def __init__(self, convnet, distnet, inter_dim=128, latent_dim=512, n_classes=10, regular=False):
super().__init__()
self.regular = regular
self.convnet = convnet
self.fc1 = nn.Linear(inter_dim, latent_dim, bias=False)
self.bn1 = nn.BatchNorm1d(latent_dim)
self.distnet = distnet
self.n_classes = n_classes
def calc_latent(self, x):
x = self.convnet(x) # (batch * (1 + n_examples * n_classes), o_channel, oh, ow) [-1, 512, 7, 7]
return self.bn1(self.fc1(x))
def get_class_examples(self, x, y, n_ex):
class_examples = []
class_idxs = [np.where(y.cpu().detach().numpy() == i)[0] for i in range(0, self.n_classes)]
for i in range(self.n_classes):
idx = np.random.choice(class_idxs[i], n_ex, replace=True)
class_examples.append(x[idx])
class_examples = torch.stack(class_examples, 1)
return class_examples
def check_labels(self, X, y, dataset):
missing = list(set(np.arange(dataset.n_classes)) - set(y.cpu().detach().numpy()))
for i in missing:
try:
X = torch.cat([X, dataset.get_example_from_class(i).unsqueeze(0).to(device)], 0)
except:
X = torch.cat([X, dataset.get_example_from_class(i)[0].unsqueeze(0).to(device)], 0)
y = torch.cat([y, torch.tensor([i]).to(device)], 0)
return X, y
def get_latent_set(self, x, y=None, dataset=None, centroids=None):
if centroids is None:
if dataset.centroid_dl_iter is not None:
ex, ex_y = next(dataset.centroid_dl_iter)
ex = ex.to(device)
ex_y = ex_y.to(device)
ex, ex_y = self.check_labels(ex, ex_y, dataset)
both = torch.cat([x, ex], 0)
both = self.calc_latent(both)
CE = self.get_class_examples(both[len(x):], ex_y, len(x)).to(device)
x = torch.concat([both[:len(x)].unsqueeze(1), CE], 1)
else:
x, y = self.check_labels(x, y, dataset)
x = self.calc_latent(x)
CE = self.get_class_examples(x, y, len(x)).to(device)
x = torch.concat([x.unsqueeze(1), CE], 1)
else:
if torch.sum(centroids) == 0:
n = x.shape[1] # B*C*H*W (batch, 1 + n_examples * n_classes, i_channel, ih, iw)
x = x.reshape(-1, *x.shape[2:])
x = self.calc_latent(x) # (batch * (1 + n_examples * n_classes), o_channel, oh, ow) [-1, 512, 7, 7] # (batch * (1 + n_examples * n_classes), latent_dim)
x = x.view(-1, n, x.shape[-1]) # (batch * (1 + n_examples * n_classes), i_channel, ih, iw)
else:
centroids = centroids.unsqueeze(0).repeat(x.shape[0],1,1)
x = self.calc_latent(x) # (batch * (1 + n_examples * n_classes), latent_dim)
x = torch.concat([x.unsqueeze(1), centroids], 1) # (batch, 1 + n_examples * n_classes, latent_dim)
return x, y
def calc_dist(self, x):
anchor_output = x[:,0,:].unsqueeze(1) # (batch, 1, output_dim)
class_outputs = x[:,1:,:] # (batch, n_classes, output_dim)
dist = torch.cdist(anchor_output, class_outputs).squeeze() # (batch, 1, n_classes)
return torch.log((torch.square(dist) + 1.0) / (torch.square(dist) + 1e-10))
def forward(self, x, y=None, dataset=None, centroids=None):
if self.regular:
x = self.calc_latent(x)
return self.distnet(x), None, None, y
else:
ls, y = self.get_latent_set(x, y=y, dataset=dataset, centroids=centroids)
d = self.calc_dist(ls)
x = self.distnet(d)
return x, d, ls, y