-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvariational_autoencoder.py
142 lines (110 loc) · 4.35 KB
/
variational_autoencoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
Code for Information Retrieval course project @ University of Trieste, MSc in Data Science & Scientific Computing A.Y. 2023/2024.
Author: Michele Alessi
This file contains the code to define and train the Variational AutoEncoder model.
"""
import torch
from torch import nn
from tqdm import tqdm
from torch.utils.data import DataLoader
class VariationalAutoEncoder(nn.Module):
"""
Class to define the Variational AutoEncoder model, with fixed number of hidden layers set to 2.
Args:
input_dim: (int) Input dimension.
h_dim_1: (int) Dimension of the first hidden layer.
h_dim_2: (int) Dimension of the second hidden layer.
z_dim: (int) Dimension of the latent space.
"""
def __init__(self, input_dim, h_dim_1, h_dim_2,z_dim):
super().__init__()
self.input_dim = input_dim
# encoder
self.img_2hid = nn.Linear(input_dim, h_dim_1)
self.hid_2hid_en = nn.Linear(h_dim_1, h_dim_2)
self.hid_2mu = nn.Linear(h_dim_2, z_dim)
self.hid_2sigma = nn.Linear(h_dim_2, z_dim)
# decoder
self.R = nn.Linear(z_dim,input_dim)
self.relu = nn.ReLU()
self.tanh = nn.Tanh()
self.softmax = nn.Softmax(dim=0)
def encode(self, x):
h1 = self.relu(self.img_2hid(x))
h2 = self.relu(self.hid_2hid_en(h1))
mu, sigma = self.hid_2mu(h2), torch.exp(self.hid_2sigma(h2))
return mu, sigma
def decode(self, z):
E = self.R(z)
return self.softmax(-E)
def forward(self, x):
mu, sigma = self.encode(x)
# reparametrization trick
epsilon = torch.randn_like(sigma)
z_new = mu + sigma*epsilon
x_bin = self.decode(z_new)
return x_bin, mu, sigma, z_new
def weights_init(m):
"""
Function to initialize the weights of the model.
Args:
m: A nn.Module object.
"""
classname = m.__class__.__name__
if classname.find('Linear') != -1:
torch.nn.init.xavier_uniform_(m.weight)
torch.nn.init.zeros_(m.bias)
def build_train_loader(matrix, BATCH_SIZE):
"""
Function to build a the train loader given the embedding matrix.
Args:
matrix: A numpy array of shape (n_samples, n_features).
BATCH_SIZE: (int) A integer indicating the batch size.
Returns:
train_loader: A DataLoader object.
"""
train_dataset = torch.tensor(matrix).float()
train_loader = DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
return train_loader
def train(NUM_EPOCHS, train_loader, model, loss_fn, lr, DEVICE, INPUT_DIM, scheduler):
"""
Function to train the model.
Args:
NUM_EPOCHS: (int) Number of epochs to train the model.
train_loader: A DataLoader object.
model: A AutoEncoder class object.
loss_fn: A torch loss function. nn.MSELoss() for tfidf, nn.BCELoss() for wc embedding were used.
lr: (float) Learning rate.
DEVICE: Device to train the model on. Typically, 'cuda' or 'cpu'.
INPUT_DIM: (int) Input dimension. This is matrix.shape[1], were matrix is the embedding matrix used to build the train_loader.
scheduler: A scheduler object. Used to reduce the learning rate when the loss plateaus. If scheduler is None, the learning rate is constant.
Returns:
lossess: A list of loss values.
"""
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_fn = loss_fn
# Training
model.train()
lossess = []
for epoch in range(NUM_EPOCHS):
print(f"Epoch {epoch + 1} of {NUM_EPOCHS}")
loop = tqdm(enumerate(train_loader))
for i, x in loop:
# forward pass
x = x.to(DEVICE)
x = x.view(x.shape[0], INPUT_DIM)
x_reconstructed, mu, sigma, _ = model(x)
# loss
reconstruction_loss = loss_fn(x_reconstructed, x)
kl_div = -torch.sum(1 + torch.log(sigma.pow(2)) - mu.pow(2) - sigma.pow(2))
# backprop
loss = reconstruction_loss + kl_div
optimizer.zero_grad()
loss.backward()
optimizer.step()
loop.set_postfix(loss=loss.item())
if scheduler:
scheduler.step(loss)
loop.set_postfix(loss=loss.item(), lr=optimizer.param_groups[0]['lr'])
lossess.append(loss.item())
return lossess