-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMainFunctions.py
1010 lines (817 loc) · 41.1 KB
/
MainFunctions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision import transforms
from torchmetrics import PearsonCorrCoef
from random import randrange
import os, os.path
import pandas as pd
import numpy as np
from numpy import random
import cv2 as cv2
from PIL import Image, ImageOps
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
import torchmetrics
import warnings
warnings.filterwarnings("ignore")
import optuna
import MainNetDefinitions as netdefs
import math
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.metrics import precision_recall_fscore_support, balanced_accuracy_score, confusion_matrix
from scipy.stats import pearsonr
from sklearn.model_selection import StratifiedKFold
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available() : torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
size = (256,256)
path = os.path.abspath(os.getcwd())
trainingData_folder = "trainingData"
learning_rate = 0.0001
best_loss = 10000
best_r2 = -10000
saved_modelparams = pd.read_csv("neuralnets/netparams.csv") if os.path.exists("neuralnets/netparams.csv") else pd.DataFrame()
r2score = torchmetrics.R2Score().to(device)
mape = torchmetrics.MeanAbsolutePercentageError().to(device)
mse = torchmetrics.MeanSquaredError().to(device)
mae = torchmetrics.MeanAbsoluteError().to(device)
def trans_normalize(img):
"""
Function for applying pixel normalization column wise to an image
Args:
img (np array): image to normalize pixels
Returns:
np array: normalized image
"""
img = np.divide(img , img.sum(axis=0), out=np.zeros_like(img), where=img.sum(axis=0) > 0)
return img
def trans_resize(img):
"""Function for resizing image with nearest neighbor interpolation using cv2
Args:
img (np array): array representing img
Returns:
np array: resized image
"""
img = cv2.resize(img,(size))
return img.astype(np.float32)
def trans_padding(img):
"""Function for resizing img using padding method as described in thesis
Args:
img (np array): array representing the img
Returns:
np array: img resized by padding method
"""
width = 256 - img.shape[0]
height = 256 - img.shape[1]
# padding can only be applied if image is smaller than specified dimensions
if (height > 0) and (width > 0):
img = np.pad(img,[(math.floor(width / 2),math.ceil(width / 2)),(math.floor(height / 2),math.ceil(height / 2))])
else:
# if padding is not possible because image is larger simple resizing is applied
img = cv2.resize(img,(size))
return img.astype(np.float32)
# code idea from https://stackoverflow.com/questions/46274961/removing-horizontal-lines-in-image-opencv-python-matplotlib
# morphological transf basis from https://docs.opencv.org/4.x/d9/d61/tutorial_py_morphological_ops.html
def clearstraightlines(img):
""" Function for removing dotted vertical lines using morphological transformations and otsus method
Args:
img (np array): array containing image depicting signal
Returns:
img (np array): img with removed dotted line
"""
img = img.copy()
width = img.shape[1]
height = img.shape[0]
thresh = cv2.threshold(img, 0, 255, cv2.THRESH_OTSU)[1]
kernel = np.ones((10,3),np.uint8)
closing = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
vertical_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, math.floor(img.shape[0] * 0.8)))
vertical_lines = cv2.morphologyEx(closing , cv2.MORPH_OPEN, vertical_kernel , iterations=2)
img[np.nonzero(vertical_lines)] = 0
# remove empty columns generated by removing straight lines
img = np.delete(img, np.argwhere(img.mean(axis=0) == 0), 1)
# resizing to orginal size
img = cv2.resize(np.array(img),(width,height), cv2.INTER_NEAREST)
return img
# defining transformation pipes for neural network approach
transform_padding = transforms.Compose( [transforms.Lambda(clearstraightlines), transforms.Lambda(trans_padding), transforms.Lambda(trans_normalize), transforms.ToTensor()])
transform_resize = transforms.Compose( [transforms.Lambda(clearstraightlines), transforms.Lambda(trans_resize), transforms.Lambda(trans_normalize), transforms.ToTensor()])
# defining dict with all relevant metrics
metrics = {"loss": 0 ,"r2" : r2score, "mse" : mse, "mae" : mae, "mape" : mape}
class CustomImageDataset(Dataset):
""" Class for creating a Dataset which the dataloader from pytroch needs to create a wrapper for iterating though the data
Args:
Dataset (dataset): inhereting from pytorch library
"""
def __init__(self, pairs, labels, images, transform, augment):
"""Constructor initializing custom dataset
Args:
pairs (np array): array containing image pair indexes
labels (np array): array containing correlation labels
images (np array): array containing imgs as np arrays
transform (pytorch transforms): transforms Compose object for transforming imgs
augment (function): function applying augmentation
"""
self.pairs = pairs
self.labels = labels
self.images = images
self.transform = transform
self.augment = augment
def __len__(self):
"""Function returning length of dataset
Returns:
int: length of dataset
"""
return len(self.labels)
def __getitem__(self, idx):
""" Function retrieving next element of dataset, which is made up of
template, match img and correlation pair for a given index
Args:
idx (int): position to retrieve element at
Returns:
np array, np array, float: template, match img and correlation label
"""
label = self.labels[idx]
template = self.images[self.pairs[idx][0]]
img = self.images[self.pairs[idx][1]]
if self.augment:
img = helperAugment(img)
template = helperAugment(template)
return self.transform(template), self.transform(img) , label.astype(np.float32)
def helperAugment(img):
""" Adding vertical straight lines with varying intensity, chance 10%
Args:
img (np array): image
Returns:
np array: augmente copy of image
"""
if randrange(10) == 1:
# copying img so no permanent alteration is done when using cv2.line
img = img.copy()
intensity = randrange(1,256)
thickness = randrange(1,3)
# vertical line
pos_x = randrange(img.shape[1])
x1,y1 = pos_x, 0
x2,y2 = pos_x, img.shape[0]
y_dash = y1
# simulate dashed line
dash_length = randrange(1,4)
while y_dash < y2:
cv2.line(img, (x1, y1), (x2, y_dash), (intensity), thickness= thickness)
y1 = y_dash + dash_length
y_dash = y_dash + ( 2 * dash_length )
return img
def loadimages(imgloc, invert):
"""Function for loading all images from specified folder
Args:
imgloc (string): location folder
invert (boolean): flag showing if image needs to be inverted since signal is black not white
Returns:
list: loaded imgs
"""
images = []
num_images = len([name for name in os.listdir(imgloc)])
for i in range(0,num_images):
img = Image.open(f"{imgloc}/img{i}.png").convert("L")
img = ImageOps.invert(img) if invert else img
images.append(np.array(img))
return images
def loadTrainingData (datatype, invert):
"""Function for loading AI Data
Args:
datatype (string): indicating which type to load, randomized vs uniform
invert (boolean): flag showing if image needs to be inverted since signal is black not white
Returns:
list, np array, np array: images, pairs with indexes, correlation labels
"""
imgloc = "trainingimgs"
if datatype == "pearson":
pairs_url = f"{trainingData_folder}/imagePairsPearson.npy"
labels_url = f"{trainingData_folder}/imageLabelsPearson.npy"
elif datatype == "random_pearson":
pairs_url = f"{trainingData_folder}/imagePairsRandom.npy"
labels_url = f"{trainingData_folder}/imageLabelsRandom.npy"
imgloc = "trainingimgs_random"
else :
Exception("Wrong method")
images = loadimages( imgloc=imgloc, invert=invert)
pairs = np.load(pairs_url, allow_pickle=True ).astype(int)
labels = np.load(labels_url, allow_pickle=True ).astype(float)
return images, pairs, labels
def loadTrainLoaders( batch_size, datatype, transform, invert, percentages, reduceindex, augment, seed ):
"""Function for building AI loaders
Args:
batch_size (int): batch size of loader
datatype (string): indicating which type to load, randomized vs uniform
transform (pytorch transforms): transforms Compose object for transforming imgs
invert (boolean): flag showing if image needs to be inverted since signal is black not white
percentages (list): list of percentages used for splitting data
reduceindex (int): threshold for data reduction
augment (function): function applying augmentation
seed (int): controlling random state
Returns:
dict: dict with loaders corresponding to percentages specified
"""
imgs, pairs, lbls = loadTrainingData ( datatype=datatype, invert=invert)
labels_df = pd.DataFrame(lbls, columns=["label"])
labels_df = discretizeData(labels_df, start=-99, stop=101, step=1, seed=seed)
idxs = stratifiedtraintestsplit(df=labels_df , percentages=percentages, reduceindex=reduceindex, seed=seed)
loaders= {}
for key, idx in idxs.items():
augment = augment if key=="train" else False
shuffle = False if key == "test" else True
subpairs, sublbls = pairs[idx], lbls[idx]
dataset = CustomImageDataset(pairs=subpairs, labels=sublbls, images=imgs, transform=transform, augment=augment)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=True, drop_last = True)
loaders[key] = loader
return loaders
def discretizeData(df, start, stop, step, seed=SEED):
"""Function for creating bins and assigning labels to them. Bin values need to be divisble by 100
Args:
df (pd dataframe): dataframe containing labels
start (int): lowest bin value
stop (int): largest bin value
step (int): step width taken between bins
seed (int, optional): random state. Defaults to SEED.
Returns:
_type_: _description_
"""
bins = math.floor((stop - start) / step)
est = KBinsDiscretizer(bins, encode='ordinal', strategy='uniform', random_state=seed)
default_bins = np.array([i / 100 for i in range(start,stop,step)]).reshape(-1, 1)
est.fit(default_bins)
df["bins"] = est.transform(df["label"].to_numpy().reshape(-1, 1)).astype(int)
intervals = est.bin_edges_[0]
df["bin_lbls"] = df["bins"].map(lambda x : f"{round(intervals[int(x)],2)}-{round(intervals[int(x+1)],2)}" )
df = df.reset_index(drop=True)
return df
def stratifiedtraintestsplit(df, percentages, reduceindex=None, seed=SEED):
"""Function for creating stratified splits of data. Since labels are continous, data needs to be discretized beforehand
Args:
df (pd dataframe): dataframe containing labels and assigned bins
percentages (list): list of percentages to use for split
reduceindex (int, optional): threshold for lowering data volume. Defaults to None.
seed (int, optional): random state. Defaults to SEED.
Returns:
dict: dict containing idx per category yielding a stratified split equal in size to passed percentages
"""
df = df.iloc[df.groupby(['bins']).head(reduceindex).index] if reduceindex else df
df["idx"] = df.index
idxs = {}
if (len(percentages) == 2):
trainidx = df.groupby("bins").sample(frac=percentages[0], random_state=seed)["idx"]
testidx = df[~ df.idx.isin(trainidx)].idx
idxs = {"train" : trainidx, "test" : testidx}
elif (len(percentages) == 3):
trainidx = df.groupby("bins").sample(frac=percentages[0], random_state=seed)["idx"]
frac = percentages[1].sum() / percentages[1:].sum()
df = df[~ df.idx.isin(trainidx)]
validx = df.groupby("bins").sample(frac=frac, random_state=seed)["idx"]
testidx = df[~ df.idx.isin(validx)].idx
idxs = {"train" : trainidx, "val" : validx, "test" : testidx}
return idxs
def evalModel (model, val_loader, criterion, evalmode=True):
"""Function for evaluating model on data (test/validation)
Args:
model (Pytorch NN): model containing trained NN
loader (Dataloader): Dataloader with test / validation data
criterion (pytorch criterion): critreion to use for loss calculation
evalmode (boolean, optional): Variable controlling if model should be put in evalmode.Defaults to True
Returns:
dict: dict containing average loss and globally predefined metrics over all batches
"""
torch.cuda.empty_cache()
model.eval() if evalmode else model.train()
total = {"loss" : 0, "r2" : 0, "mse" : 0, "mae" : 0, "mape" : 0}
with torch.no_grad():
for idx, vdata in enumerate(val_loader):
vtemplates, vimages, vtargets = vdata
vtemplates = vtemplates.to(device, non_blocking=True)
vimages = vimages.to(device, non_blocking=True)
vtargets = vtargets.to(device,non_blocking=True)
voutputs = model(vtemplates,vimages)
vloss = criterion(voutputs, vtargets)
for key, metric in metrics.items():
if key == "loss":
total[key] += vloss
else:
total[key] += metric(voutputs, vtargets.squeeze())
val_avgs = {k: v / len(val_loader) for k,v in total.items()}
return val_avgs
def train_one_epoch(model, optimizer, train_loader, criterion):
"""Function for training a model one epoch
Args:
model (Pytorch NN): model instance for training
optimizer (torch optimizer): optimizer to use during loss calculation
train_loader (Dataloader): loader with training data
criterion (torch criterion): criterion used for loss calculation
Returns:
dict, Pytorch NN: dict containing evaluation values, trained model
"""
running = {"loss" :0, "r2" : 0, "mse" : 0, "mae" : 0, "mape" : 0}
total = {"loss" : 0, "r2" : 0, "mse" : 0, "mae" : 0, "mape" : 0}
model.train()
for idx, data in enumerate(train_loader):
optimizer.zero_grad(set_to_none=True)
templates, images, targets = data
templates = templates.to(device)
images = images.to(device)
targets = targets.to(device)
outputs = model(templates, images)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
for key, metric in metrics.items():
if key == "loss":
running[key] += loss
total[key] += loss
else:
running[key] += metric(outputs, targets.squeeze())
total[key] += metric(outputs, targets.squeeze())
total = {k: v / len(train_loader) for k,v in total.items()}
del templates
del images
del targets
torch.cuda.empty_cache()
return total, model
def trainModel (model, optimizer, loaders, training_params, saveModel = False, trial=None, logging=False, printing=False, num_epochs=5, stop=True, criterion=torch.nn.L1Loss()):
"""Main function for training model
Args:
model (Pytorch NN): model to train
optimizer (torch optimizer): optimizer used for updating weights
loaders (dict): dict containing dataloaders
training_params (dict): dict containing training params
saveModel (bool, optional): flag controlling if model should be saved. Defaults to False.
trial (optuna trial, optional): optuna trial object used for pruning a trial. Defaults to None.
logging (bool, optional): flag controlling tensorboard logging. Defaults to False.
printing (bool, optional): flag controlling printouts. Defaults to False.
num_epochs (int, optional): number of epochs which to train model for. Defaults to 5.
stop (bool, optional): flag controlling early stopping. Defaults to True.
criterion (torch criterion, optional): criterion used for loss calculation. Defaults to torch.nn.L1Loss().
Raises:
optuna.exceptions.TrialPruned: Exception used for pruning trial
Returns:
pd dataframe, Pytorch NN: dataframe containing logs from training, trained model
"""
log = pd.DataFrame()
torch.cuda.empty_cache()
global best_loss, saved_modelparams, best_r2
train_loader = loaders.pop("train")
avgs = {}
tbdir = ""
tb = SummaryWriter() if logging else None
for epoch in range (num_epochs):
avgs["train"], model = train_one_epoch(model, optimizer, train_loader=train_loader, criterion=criterion)
for dtype, loader in loaders.items():
avgs[dtype] = evalModel(model, loader, criterion=criterion)
if trial:
trial.report(avgs["real"]["mae"], epoch)
if trial.should_prune():
raise optuna.exceptions.TrialPruned()
if printing:
printstring = "EPOCH RESULTS:"
for dtype, avg in avgs.items():
printstring = f"{printstring} {dtype} MAE: {round(avg['mae'].item(),3)} {dtype} R2: {round(avg['r2'].item(),3)}"
print(printstring)
if logging:
for dtype, avg in avgs.items():
tb.add_scalars('MAE',
{ dtype : avg["mae"]}, epoch + 1)
tb.add_scalars('R2',
{ dtype : avg["r2"]}, epoch + 1)
tb.flush()
tbdir = tb.get_logdir()
log_avgs = {}
for dtype, avg in avgs.items():
log_avgs = log_avgs | {f"{dtype}_{k}" : v.item() for k,v in avg.items()}
log = log.append(
log_avgs |
{"tbdir" : tbdir},
ignore_index=True)
# Track best performance, and save the model's state
if saveModel and avgs["real"]["mae"] < best_loss:
if avgs["real"]["r2"] < 1:
best_loss = avgs["real"]["mae"]
best_r2 = avgs["real"]["r2"]
torch.save(model.state_dict(), f"neuralnets/{model.modelname}")
print(f"SAVED MODEL WITH VALUES: MAE: {best_loss} R2: {avgs['real']['r2']}")
saved_modelparams = saved_modelparams[saved_modelparams["model"] != model.modelname].append( pd.DataFrame({"model" : model.modelname} | training_params, index=[0]), ignore_index=True)
saved_modelparams.to_csv("neuralnets/netparams.csv", index=False)
elif avgs["real"]["r2"] > 1:
print("Something WEIRD is HAPPENING")
print(log)
# stop training early if relative change in loss is less than 0.1% or is negative
# stop only if condition is met for 5 epochs in a row
# starting from epoch 30
evaldf = log[["real_mae"]].tail(2).apply(lambda x: (x - x.shift(-1))).head(1) if "real_mae" in log else None
if stop :
if ( (epoch > 30) & (evaldf < 0.001).all(axis="columns").item() ):
patience = patience -1
if patience == 0:
print("Stopping model training since change in loss is below threshold")
log["stopped"] = True
break
else:
patience = 5
return log, model
def pearson(tens1, tens2):
"""Function for calculating Pearson's R coefficient
Args:
tens1 (torch tensor): first time series
tens2 (torch tensor): second time series
Returns:
tensor: calculated correlation value
"""
pearsonr = PearsonCorrCoef(num_outputs=tens1.shape[0]).to(device)
res = pearsonr(tens1.transpose(0,1), tens2.transpose(0,1))
return res
def buildmodelFromParams(netparams, load=False, seed=SEED):
"""Function for initializing model based on passed parameters
Args:
netparams (dict): dict with paramters to use for initialization
load (bool, optional): flag controlling if saved model weights should be loaded. Defaults to False.
seed (int, optional): random state. Defaults to SEED.
Returns:
Pytorch NN: initialized model instance
"""
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
blocks = netparams["blocks"]
kernel_size = netparams["kernel_size"]
stride = netparams["stride"]
padding = netparams["padding"]
modelname = netparams["modelname"] if "modelname" in netparams else "dummy"
torch.clear_autocast_cache(), torch.cuda.empty_cache(), torch.manual_seed(seed), torch.cuda.manual_seed(seed)
model = netdefs.HazelNet( blocks = blocks, kernel_size=kernel_size, padding=padding, stride=stride, seed=seed, modelname=modelname).to(device, non_blocking=True)
if load:
model.load_state_dict(torch.load(f"neuralnets/{modelname}"))
return model
def buildDataFromParams(datatype, transform, batch_size, percentages=np.array([0.70,0.15,0.15]), percentages_real = np.array([0.6,0.4]), reduceindex=50, seed=SEED , realloc="EGMcutouts/realimgs", oversamp=True, augment=False):
"""Main function for building data loaders from parameters passed. ECG and AI Data
Args:
datatype (string): datatype used, random or uniform
transform (torch transform compose): transformations to apply
batch_size (int): size of each batch
percentages (np array, optional): array defining the splits for AI data. Defaults to np.array([0.70,0.15,0.15]).
percentages_real (np array, optional): array defining the splits for ECG data. Defaults to np.array([0.6,0.4]).
reduceindex (int, optional): _description_. Defaults to 50.
seed (int, optional): random state. Defaults to SEED.
realloc (str, optional): location of folder with ECG images. Defaults to "EGMcutouts/realimgs".
oversamp (bool, optional): flag, controlling whether to apply oversampling to train ECG data. Defaults to True.
augment (bool, optional): flag controlling augmentation. Defaults to False.
Returns:
5 x DataLoader: train AI, val AI, test AI, train ECG, test ECG
"""
transform = globals()[transform] # transforming string into function
train_loader, val_loader, test_loader = loadTrainLoaders(batch_size=batch_size, datatype=datatype, transform=transform, percentages=percentages, reduceindex=reduceindex, augment=augment, invert=False, seed=seed).values()
real_train_loader, real_val_loader = loadRealLoaders(batch_size=batch_size, transform=transform, imgloc=realloc , augment=False, invert=False, oversamp=oversamp, percentages = percentages_real, seed=seed).values()
return train_loader, val_loader, test_loader, real_train_loader, real_val_loader
def getparamcount(model):
"""Function for calculating the number of parameters in a model
Args:
model (Pytorch NN): neural network
Returns:
int: number of parameters
"""
return sum(p.numel() for p in model.parameters())
def displayPredictions(model, loader, nrows = 6, ncols = 4):
"""Function for displaying predicitons in a grid
Args:
model (Pytorch NN): model to use for predictions
loader (Dataloader): loader containing training data
nrows (int, optional): number of rows in grid. Defaults to 6.
ncols (int, optional): number of columns in grid. Defaults to 4.
Returns:
matplotlib figure: figure containing predictions in a grid
"""
j = 0
fig, ax = plt.subplots(nrows,ncols,figsize=(12,12))
data = next(iter(loader))
tmps, imgs, lbls = data[0], data[1], data[2]
for i, axi in enumerate(ax.flat):
if i % 2 == 0:
img1, img2, lbl = tmps[j], imgs[j], lbls[j]
else:
img2, img1, lbl = tmps[j], imgs[j], lbls[j]
if j < len(data[2]) - 1: j += 1
axi.imshow(transforms.ToPILImage()(img1))
with torch.no_grad():
model.eval()
output = model(img1.unsqueeze(0).to(device), img2.unsqueeze(0).to(device))
# write row/col indices as axes' title for identification
axi.set_title(" Pred: " + str(round(output.item(),2)) + " Lbl: " + str(round(lbl.item(),2)))
plt.tight_layout()
plt.show()
return fig
def oversample(df):
"""Function for oversampling labels based on dataframe
Args:
df (pd dataframe): dataframe containing bins and labels
Returns:
pd dataframe: oversampled dataframe
"""
dflarge = df.copy()
df_grouped = df.groupby("bins").agg({"label" : "count"}).sort_values(by="label", ascending=True)
maxgroup = df_grouped.max().item()
for bin, group in df.groupby("bins"):
dflarge = dflarge.append(group.sample(maxgroup-len(group), replace=True, random_state = SEED ))
return dflarge
def prepareRealData(directory ):
"""Function for aggregating all ECG data stored across different folders into one dataframe
Args:
directory (str): folder where ECG imgs are stored
Returns:
pd dataframe: dataframe storing labels and corresponding template and match img urls
"""
filename = "labels.csv"
df = pd.DataFrame()
for subdir in os.listdir(directory):
i = 0
tmps = []
imgs = []
for file in os.listdir(f"{directory}/{subdir}/images"):
tmps.append(f"{subdir}/images/{file}") if (i % 2 == 0) else imgs.append(f"{subdir}/images/{file}")
i += 1
tempdf = pd.read_csv(f"{directory}/{subdir}/{filename}", sep=",", dtype=int, header=None).transpose()
tempdf["label"] = tempdf / 100
tempdf["tmp"] = tmps
tempdf["img"] = imgs
df = df.append( tempdf[["label", "tmp", "img"]])
return df
def importimg(path, invert):
""" FFunction for loading img from path. Handling of transparency inverting and greyscaling of image
Args:
path (str): location of img to import
invert (bool): flag controlling inverting black to white
Returns:
np array: imported img
"""
img = Image.open(path).convert("RGBA")
if invert:
background = Image.new('RGBA', img.size, (255,255,255))
img = Image.alpha_composite(background, img).convert("L")
img = ImageOps.invert(img)
else:
background = Image.new('RGBA', img.size, (0,0,0))
img = Image.alpha_composite(background, img).convert("L")
return np.array(img)
def loadRealData(df, imgloc, invert):
"""Function for loading ECG data
Args:
df (pd dataframe): dataframe containing labels and img urls
imgloc (str): string to folder containing the imgs
invert (boolean): flag controlling inverting black to white
Returns:
list, np array, np array: imgs, array of index pairs, array of correlation labels
"""
imgs, pairs, lbls = [], [], []
i = 0
for row in df.iterrows():
img = importimg(f"{imgloc}/{row[1]['img']}", invert)
tmp = importimg(f"{imgloc}/{row[1]['tmp']}", invert)
imgs.append(img)
imgs.append(tmp)
pairs.append((i, i+1))
i = i + 2
lbls.append(row[1]["label"])
pairs, lbls = np.array(pairs), np.array(lbls)
return imgs, np.array(pairs), np.array(lbls)
def loadRealLoaders( batch_size, transform, imgloc, invert, augment, oversamp, percentages, seed=SEED):
"""Function for loading ECG loaders
Args:
batch_size (int): batch size of loader
transform (pytorch transforms): transforms Compose object for transforming imgs
imgloc (str): folder containing imgs
invert (boolean): flag showing if image needs to be inverted since signal is black not white
augment (function): function applying augmentation
oversamp (bool): flag controlling oversampling of training data
percentages (list): list of percentages used for splitting data
seed (int): controlling random state
Returns:
dict: dict containing loaders
"""
df = prepareRealData(directory = imgloc)
df = discretizeData(df, start=-99, stop=109, step=10, seed=seed)
data_dict = {}
idxs = stratifiedtraintestsplit(df=df, percentages=percentages, reduceindex=None, seed=seed)
for k,v in idxs.items():
data_dict[k] = df.iloc[(v)]
# since we oversample based on dataframe we need to oversample first and build loaders afterwards
data_dict["train"] = oversample(data_dict["train"]) if oversamp else data_dict["train"]
loaders = buildRealLoadersFromDict (data_dict=data_dict, batch_size=batch_size, transform=transform, augment=augment, invert=invert, imgloc=imgloc)
return loaders
def buildRealLoadersFromDict ( data_dict, batch_size, transform, imgloc, invert, augment):
"""Helper Function for constructing loaders from dictionary
Args:
data_dict (dict): dictionary with dataframes containing img urls and labels
batch_size (int): batch size of loader
transform (torch transforms compose): transform function to apply
imgloc (str): location of folder with ECG imgs
invert (bool): flag controlling inverting pixels
augment (bool): flag controlling augmentation
Returns:
dict: dictionary with data loaders
"""
loaders = {}
for key, data in data_dict.items():
augment = augment if key == "train" else False
shuffle = True if key == "train" else False
drop_last = False if key == "test" else True
imgs, pairs, lbls = loadRealData(df=data, imgloc=imgloc, invert=invert)
dataset = CustomImageDataset(pairs=pairs, labels=lbls, images=imgs, transform=transform, augment=augment)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=True, drop_last = drop_last)
loaders[key] = loader
return loaders
def evalCompleteModel (model, loader, nbins=20):
"""Function for executing more detailed analysis of model on provided data (test/validation)
Args:
model (Pytorch NN): model containing trained NN
loader (Dataloader): Dataloader with test / validation data
Returns:
dict: dictionary containing detailed evaluation as well as a dataframe with predictions and labels listed and a numpy arry with a confusion matrix
"""
torch.cuda.empty_cache()
total = {}
preds = []
lbls = []
model.eval()
r2score = torchmetrics.R2Score().cpu()
mape = torchmetrics.MeanAbsolutePercentageError().cpu()
mse = torchmetrics.MeanSquaredError().cpu()
mae = torchmetrics.MeanAbsoluteError().cpu()
metrics = {"loss": 0 ,"r2" : r2score, "mse" : mse, "mae" : mae, "mape" : mape}
with torch.no_grad():
for idx, data in enumerate(loader):
templates, images, targets = data
templates = templates.to(device, non_blocking=True)
images = images.to(device, non_blocking=True)
targets = targets.to(device,non_blocking=True)
outputs = model(templates,images)
preds.extend(outputs.cpu())
lbls.extend(targets.cpu())
preds = torch.tensor(preds)
lbls = torch.tensor(lbls)
for key, metric in metrics.items():
if key != "loss" :
total[key] = metric(lbls, preds).item()
df = pd.DataFrame({"pred" : preds, "target" : lbls})
est = KBinsDiscretizer(nbins, encode='ordinal', strategy='uniform')
default_bins = np.array([i / 10 for i in range(-10,11,1)]).reshape(-1, 1)
est.fit(default_bins)
df["pred_bins"] = est.transform(df["pred"].to_numpy().reshape(-1, 1)).astype(int)
df["target_bins"] = est.transform(df["target"].to_numpy().reshape(-1, 1)).astype(int)
intervals = est.bin_edges_[0]
df["pred_lbls"] = df["pred_bins"].map(lambda x : f"{round(intervals[int(x)],2)}-{round(intervals[int(x+1)],2)}" )
df["target_lbls"] = df["target_bins"].map(lambda x : f"{round(intervals[int(x)],2)}-{round(intervals[int(x+1)],2)}" )
target_bins= df["target_bins"].to_numpy()
pred_bins = df["pred_bins"].to_numpy()
total["micro_prec"], total["micro_recall"], total["micro_f1"],_ = precision_recall_fscore_support( target_bins, pred_bins , average='micro', zero_division=0)
total["macro_prec"], total["macro_recall"], total["macro_f1"], _ = precision_recall_fscore_support( target_bins, pred_bins , average='macro', zero_division=0)
total["bacc"]= balanced_accuracy_score( target_bins, pred_bins )
total["conf_matrix"] = confusion_matrix( target_bins, pred_bins )
total["pearsonr"] = pearsonr( target_bins, pred_bins )[0]
total["df"] = df
return total
def kfoldLoader (loader, n_splits, start, stop, step, seed):
"""Function for kfolding dataset from dataloader. Labels are extracted discretized and used with stratifiedkfoled
Args:
loader (dataloader): loader to use for kfolding
n_splits (int): number of splits to create
start (int): start value for binning
stop (int): end value for binning
step (int): step size between bins
seed (int): random state
Returns:
stratifiedkfold, pd dataframe: stratifiedkfold object with indexes, dataframe containing labels with bins
"""
df = pd.DataFrame({"label" : loader.dataset.labels })
df = discretizeData(df=df, start=start, stop=stop, step=step, seed=seed)
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=seed)
return skf, df
def buildLoaderFromIdx (loader, idx_dict, transform, augment, batch_size):
"""Function for splitting loader based on index dictionary, creating a new dictionary of loaders
Args:
loader (dataloader): dataloader with data to split
idx_dict (dict): dictionary with indexes to use for split
transform (torch transforms compose): transform functions to apply
augment (boolean): flag used for augmentation
batch_size (int): batch size of new loaders
Returns:
dict: dict containing new loaders
"""
loader_dict = {}
transform = globals()[transform]
labels, imgs, pairs = loader.dataset.labels, loader.dataset.images, loader.dataset.pairs
for key, idx in idx_dict.items():
dataset = CustomImageDataset(pairs=pairs[idx], labels=labels[idx], images=imgs, transform=transform, augment=augment if key == "train" else False)
res_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, pin_memory=True, drop_last = (False if key == "test" else True))
loader_dict[key] = res_loader
return loader_dict
# main source for use of optuna module: https://medium.com/pytorch/using-optuna-to-optimize-pytorch-hyperparameters-990607385e36
def objective(trial, datatype, transform, loaders, gridlogname, num_epochs, printing=True):
"""Function for executing an optuna study used mainly for grid search and hyperparamter optimization
Args:
trial (optuna trial): optuna trial object representing a single optimization trial
datatype (str): datatype to use uniform or random
transform (torch transforms compose): transformation applied to imgs
loaders (dict): dictionary containing loaded dataloaders
gridlogname (str): path and filename for saving results
num_epochs (int): number of epochs to train model for
printing (bool, optional): flag controlling printouts. Defaults to True.
Returns:
float: best loss determined in trial
"""
try:
gridlog = pd.read_csv(gridlogname) if os.path.exists(gridlogname) else pd.DataFrame()
trialparams = {}
trial.set_user_attr("datatype", datatype)
trial.set_user_attr("transform", transform)
trialparams["datatype"] = datatype
trialparams["transform"] = transform
trialparams["modelSeed"] = trial.suggest_int("modelSeed", 1,100,1)
trialparams["learning_rate"] = trial.suggest_loguniform('learning_rate', 1e-5, 1e-1)
trialparams["optimizer_name"] = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
trialparams["blocks"] = trial.suggest_categorical('blocks', [3,4,5])
trialparams["padding"] = trial.suggest_categorical('padding', [2,4,6])
trialparams["stride"] = trial.suggest_categorical('stride', [2,4,6])
trialparams["kernel_size"] = trial.suggest_categorical('kernel_size', [3,5,7])
criterion = torch.nn.L1Loss()
W = 256
print(f"Training Model with params: {trialparams}")
for i in range(trialparams["blocks"]):
W = ((W-trialparams["kernel_size"]+2*trialparams["padding"] )/trialparams["stride"]+1)
if W < 1:
print("Hyperparamaters not combinable")
return 10000
torch.clear_autocast_cache(), torch.cuda.empty_cache(), torch.manual_seed(SEED), torch.cuda.manual_seed(SEED)
model = buildmodelFromParams(trialparams, seed=trialparams["modelSeed"] )
optimizer = getattr(optim, trialparams["optimizer_name"])(model.parameters(), lr=trialparams["learning_rate"] )
log, model = trainModel(model = model, optimizer=optimizer, loaders=loaders, training_params=trialparams, logging=False, saveModel=False, num_epochs=num_epochs, printing=printing, stop=False, criterion=criterion )
best_scores = {}
logs = {}
tbdir = log["tbdir"].head(1).item()
for dtype in loaders.keys():
best_score = list(log.sort_values(by=f"{dtype}_mae", ascending=True).head(1)[[f"{dtype}_mae"]].to_dict()[f"{dtype}_mae"].items())[0]
best_scores[f"{dtype}_best_mae_epoch"] = best_score[0]
best_scores[f"{dtype}_best_mae"] = best_score[1]
log_values = {}
for dtype, dlog in logs.items():
log_values = log_values | {f"{dtype}_{k}" : v for k,v in dlog.items()}
log = ( trialparams |
best_scores |
log_values |
{"tbdir" : tbdir} )
gridlog = gridlog.append(pd.DataFrame(log, index=[1]))
gridlog.to_csv(gridlogname, index=False)
loss = best_scores["real_best_mae"]
print(f"Finished training! Params: {trialparams}")
except (ValueError, TypeError, UnboundLocalError ) as err:
print("Error encountered:", err)
loss = 10000
return loss
def analyzemodel(netparams, loaders, num_epochs, stop=False, model=None, saveModel=False):
"""Function for executing a manual more granular model analysis made up of training and evaluation
Args:
netparams (dict): dict containing params to use for initializing model architecture
loaders (dict): dict containing loaders to use for training and evaluation
num_epochs (int): number of epochs to train model for
stop (bool, optional): flag controlling early stopping. Defaults to False.
model (Pytorch NN, optional): model instance. Defaults to None.
saveModel (bool, optional): flag controlling if model weights should be saved. Defaults to False.
Returns:
pd dataframe, model: dataframe containing logged information (used params, scores etc.), trained model
"""
torch.clear_autocast_cache(), torch.cuda.empty_cache(), torch.manual_seed(SEED), torch.cuda.manual_seed(SEED)
nbins = 20
netparams["modelname"] = netparams["modelname"] if "modelname" in netparams else "dummy"
model = buildmodelFromParams(netparams, seed=netparams["modelSeed"]) if not(model) else model
optimizer = getattr(optim, netparams["optimizer"])(model.parameters(), lr=netparams["learning_rate"] )
criterion = torch.nn.L1Loss()
log, model = trainModel(model = model, optimizer=optimizer, loaders=loaders, training_params=netparams, logging=True, saveModel=saveModel, num_epochs=num_epochs, printing=True, stop=stop, criterion=criterion )
best_scores = {}
logs = {}
tbdir = log["tbdir"].head(1).item() if "tbdir" in log else None
for dtype in loaders.keys():
bestlog = log.sort_values(by=f"{dtype}_mae", ascending=True).head(1)[[f"{dtype}_mae", f"{dtype}_r2", f"{dtype}_mse"]]
best_scores = best_scores | bestlog.to_dict('r')[0]
best_scores[f"{dtype}_mae_epoch"] = bestlog.index.item()
print(f"Exectuing loop with bestlog {bestlog}")
# Saving additional logs and conf matrix in tensor boar directory
if tbdir:
logs[dtype] = evalCompleteModel(model=model, loader=loaders[dtype],nbins=nbins )
np.save(f"{tbdir}/{dtype}_conf_matrix",logs[dtype].pop("conf_matrix"),allow_pickle=True )