-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflash_dtw.cuh
3812 lines (3394 loc) · 211 KB
/
flash_dtw.cuh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* FLASH DTW
Fast Locally Aggregated Subalignment Heuristic for Dynamic Time Warping
Paul Gordon, 2019
Edited: Steven Hepburn, 2019
Edited: Matthew Wiens, 2019
The idea here is to find points in the middle of the local warp path that must be passed through, which
lets us reduce the cost matrix size a lot since we can compute a separate, smaller cost matrix for each of the
between-anchor sections and then stitch the warp paths together (hence stitch_dtw function below).
If you had a semi-global (a.k.a. streaming) DTW implementation to find the
location of the query in a large subject based on collinearity (within a band), you'd already know the anchor points,
i.e. the collinear blocks. This saves us effort compared to sparseDTW or other global warp path efficiency methods.
In a way you could consider this a "seed-and-extend" approach to DTW.
We do this most efficiently by calculating tiny DTW blocks on all the subject in parallel on the GPU, then only fully align the
bits that are non-randomly less distant than other non-collinear blocks, using a Mann-Whitney test of tiny DTW distances ranked
for co-linear vs non., and FDR correction if requested.
This header file contains a bunch of CUDA and host functions, but only those with a prototype at the top of this file are for calling externally.
TODO:
- somehow passing back matching blocks = 0 with command line './flash_test ../myFiles/k_segmentation_v1/ch347_read8418_segmented_values.raw.txt ../myFiles/k_segmentation_v1/ch347_read8418_noheader.raw.txt -n 3 -v -c -l 10 -g 2 -w 2', fix this, seems to only occur when using '-g 2'
- test soft_dtw and hard_dtw (may not be working, time is same for all sizes ???)
- enable reading of fast5 files
- review and enable half, short, and double data types
- review and enable MINIDTW_STRIDE != 1
- every kernel needs unit testing
- print out info only when verbose is enabled (even in flash_dtw.cuh side functions)
*/
#ifndef __FLASH_DTW_H
#define __FLASH_DTW_H
/* NVIDIA GPUs provide 64KB of constant cache, which is accessed as somewhere between 2 and 8KB of dedicated L1 cache on each multiprocessor (a.k.a. SM or SMX).
Let's devote almost all of it to the query to give us the most flexibility possible in terms of searching long queries. For shorts (default) that means ~32K is max query length. */
// Unlike many CPU-based lower-bound optimized implementations of streaming DTW, we need to store all the query in limited device memory, so we really care about how
// we will store the query (and subject) values as bigger datatypes like floats limit the size of DTW we can compute compared to char or short.
#ifdef QTYPE_short
#define QTYPE short
#endif
// Should be big enough to hold dynamic range of QTYPE values*MAX_DP_SAMPLES
#ifdef QTYPE_ACC_short
#define QTYPE_ACC short
#define DTW_MAX SHRT_MAX
#define dtwmin(a, b) (min(a, b))
#endif
#ifdef QTYPE_float
#define QTYPE float
#endif
#ifdef QTYPE_ACC_float
#define QTYPE_ACC float
#define DTW_MAX FLT_MAX
#define dtwmin(a, b) (min(a, b))
#endif
#ifdef QTYPE_double
#define QTYPE double
#endif
#ifdef QTYPE_ACC_double
#define QTYPE_ACC double
#define dtwmin(a, b) (min(a, b))
#define DTW_MAX DBL_MAX
#endif
#ifdef QTYPE_half
#include <cuda_fp16.h>
#define QTYPE __half
#endif
#ifdef QTYPE_ACC_half
#define QTYPE_ACC __half
#define DTW_MAX 65504.f
#define dtwmin(a, b) (__hgt(a, b) ? a : b)
#endif
#include "algo_datatypes.h"
// Here are prototypes for the methods that the caller cares about.
__host__ void load_subject(QTYPE *subject, QTYPE *subject_std, long subject_length, int use_std, cudaStream_t stream);
__host__ int flash_dtw(QTYPE *query, int query_length, char* query_name, float max_warp_proportion, float max_pvalue, float max_qvalue, int max_ranks, int normalization_mode, match_record **results, int *num_results, int minidtw_size, int minidtw_warp, bool record_anchors, int use_fast_anchor_calc, int use_std, int use_hard_dtw, cudaStream_t stream);
__inline__ __host__ void soft_dtw_wrap(const dim3 griddim_dtw, const int threadblock_size_dtw, cudaStream_t stream, const int num_query_indices, QTYPE * subject, QTYPE * subject_std, long long * query_adjacent_candidates, QTYPE_ACC * query_adjacent_distances, const int minidtw_size, const int minidtw_warp, const int use_std);
// Here are prototypes for the methods we want to unit test
template <class T>
__host__ void get_znorm_stats(T *data, long data_length, float *mean, float *stddev, T *min, T *max, cudaStream_t stream=0);
// __global__ void mean_min_max_float(float *data, int data_length, float *threadblock_means, QTYPE *threadblock_mins, QTYPE *threadblock_maxs);
// __global__ void mean_min_max(QTYPE *data, int data_length, float *threadblock_means, QTYPE *threadblock_mins, QTYPE *threadblock_maxs);
template <class T>
__global__ void variance(T *data, long data_length, long orig_data_length, float *data_mean, float *threadblock_variances);
__global__ void variance_float(float *data, long data_length, long orig_data_length, float *data_mean, float *threadblock_variances);
template <class T>
__global__ void query_znorm(int offset, size_t length, float *data_mean, float *data_stddev, T *min, T *max, int normalization_mode, cudaStream_t stream=0);
__global__ void welford_query_znorm(int offset, int length, float* mean, float* ssq, long total_values_znormalized, cudaStream_t stream=0);
__global__ void thorough_calc_anchor_candidates_colinear_pvals(const long long *query_adjacent_candidates, const int num_candidate_query_indices, const int num_candidate_subject_indices, QTYPE_ACC *global_non_colinear_distances, const int expected_num_pvals, const float max_warp_proportion, const float max_pval, const QTYPE_ACC *query_adjacent_distances, unsigned int *num_results_recorded, unsigned int *num_results_notrecorded, unsigned int max_num_results, float *output_pvals, int *output_left_anchors_query, int *output_right_anchors_query, long *output_left_anchors_subject, long *output_right_anchors_subject, int *output_num_members, const int minidtw_size, int* all_memberships, long long* all_subject_indices, int* lis_memberships, QTYPE_ACC* colinear_buff, QTYPE_ACC* non_colinear_buff, int* anch_mem_buff, float* pval_buff, int* left_query_buff, int* right_query_buff, long* left_subject_buff, long* right_subject_buff);
__global__ void fast_calc_anchor_candidates_colinear_pvals(const long long *query_adjacent_candidates, const int num_candidate_query_indices, const int num_candidate_subject_indices, QTYPE_ACC *global_non_colinear_distances, int *num_non_colinear_distances, const float max_warp_proportion, const float max_pval, const QTYPE_ACC *query_adjacent_distances, unsigned int *num_results_recorded, unsigned int *num_results_notrecorded, unsigned int max_num_results, float *output_pvals, int *output_left_anchors_query, int *output_right_anchors_query, long *output_left_anchors_subject, long *output_right_anchors_subject, int *output_num_members, const int minidtw_size, int* all_memberships, long long* all_subject_indices, int* lis_memberships, QTYPE_ACC* colinear_buff, QTYPE_ACC* non_colinear_buff, int* anch_mem_buff, float* pval_buff, int* left_query_buff, int* right_query_buff, long* left_subject_buff, long* right_subject_buff);
__global__ void load_non_colinear_distances(const QTYPE_ACC *query_adjacent_distances, const int num_candidate_query_indices, const int num_candidate_subject_indices, QTYPE_ACC *global_non_colinear_distances, int *num_non_colinear_distances);
__global__ void fdr(float *input_pvals, int *input_pval_ranks, unsigned int input_pvals_length, float *output_qvals);
__global__ void hard_dtw_std(const int num_query_indices, QTYPE * subject, QTYPE * subject_std, long long * query_adjacent_candidates, QTYPE_ACC * query_adjacent_distances);
__global__ void hard_dtw(const int num_query_indices, QTYPE * subject, QTYPE * subject_std, long long * query_adjacent_candidates, QTYPE_ACC * query_adjacent_distances, int minidtw_size);
__global__ void soft_dtw_std(const int num_query_indices, QTYPE * subject, QTYPE * subject_std, long long * query_adjacent_candidates, QTYPE_ACC * query_adjacent_distances);
__global__ void soft_dtw(const int num_query_indices, QTYPE * subject, QTYPE * subject_std, long long * query_adjacent_candidates, QTYPE_ACC * query_adjacent_distances);
// Here are prototypes for the inline methods we want to unit test
__inline__ __device__ float warpReduceSum(float val);
__inline__ __device__ int warpReduceMin(int val);
template <class T>
__inline__ __device__ void set_membership_results(T* membership, const int num_candidate_query_indices, long long* subject_indices, const int minidtw_size, const float max_warp_proportion, bool global_mem);
template <class T>
__inline__ __device__ void get_sorted_colinear_distances(QTYPE_ACC *sorted_colinear_distances, T* membership, QTYPE_ACC *distances, const int num_candidate_query_indices, int* partition_counter, int* local_partition, int num_members, int* num_sorted_colinear_distances, bool global_mem);
template <class T>
__inline__ __device__ void get_sorted_non_colinear_distances(QTYPE_ACC *sorted_non_colinear_distances, T* membership, int num_members, const QTYPE_ACC *query_adjacent_distances, int* num_sorted_non_colinear_distances, const int num_candidate_query_indices, const int num_candidate_subject_indices, bool thorough_calc, QTYPE_ACC *global_non_colinear_distances, int num_sorted_colinear_distances, bool global_mem);
__inline__ __device__ void calculate_pval(const float max_pval, QTYPE_ACC *sorted_colinear_distances, QTYPE_ACC *sorted_non_colinear_distances, int num_sorted_non_colinear_distances, unsigned int *num_results_recorded, unsigned int *num_results_notrecorded, unsigned int max_num_results, float *output_pvals, int leftmost_anchor_query, int rightmost_anchor_query, long leftmost_anchor_subject, long rightmost_anchor_subject, int num_members, int *output_left_anchors_query, int *output_right_anchors_query, long *output_left_anchors_subject, long *output_right_anchors_subject, int *output_num_members, int* anch_mem_buff, float* pval_buff, int* left_query_buff, int* right_query_buff, long* left_subject_buff, long* right_subject_buff);
__host__ __inline__ int flash_dtw_setup(QTYPE* query, int query_length, int normalization_mode, cudaStream_t stream=0);
__inline__ __device__ void block_findMin(QTYPE *dtw_results, long long *idata, const int tid, const int block_dim, const long long strided_jobid, QTYPE_ACC &min, long long &index);
// Functions that are no longer in use
__host__ QTYPE* segment_signal(QTYPE *series, unsigned int *series_length, short expected_segment_length, float max_attenuation, int normalization_mode, cudaStream_t stream);
__global__ void merge_ranks(float *anchor_pvals, int num_pvals, int max_rank, int *anchor_pval_ranks, unsigned int *anchor_num_ranks);
__global__ void rank_results_blocks(float *input_pvals, int input_pvals_length, int *sorted_pval_ordinals, int *sorted_pval_ranks);
__global__ void get_min_distance_sorted_subject_sample(QTYPE_ACC *dtw_distances, int *min_idxs, int num_min_idxs, QTYPE_ACC *sorted_non_colinear_distances, int num_samples);
__host__ double stitch_dtw(std::vector< std::pair<int,long> > input_warp_anchors, float *query, float *subject, std::vector< std::pair<int,long> > &output_warp_path);
__host__ void traceback(double *matrix, int nx, int ny, int xoffset, long yoffset, std::vector< std::pair<int,long> > &path);
__host__ double euclidean_dtw(float *x, int nx, float *y, int ny, int xoffset, long yoffset, std::vector< std::pair<int,long> > &path);
#include <thrust/sort.h>
#include "stdio.h"
#include <algorithm>
#include <iostream>
#include <limits>
#include <vector>
// For INT_MAX
#include <climits>
// For FLT_MAX etc.
#include <cfloat>
// For erf function
#include <cmath>
#include <fstream>
// For warp level CUDA ops like shuffle down
// #define FULL_MASK 0xffffffff
// #define CUDA_WARP_WIDTH 32
// #if defined(_DEBUG_REG)
// #define CUDA_THREADBLOCK_MAX_THREADS 256
// #else
// #define CUDA_THREADBLOCK_MAX_THREADS 1024
// #endif
// #define CUDA_THREADBLOCK_MAX_L1CACHE 48000
#include "cuda_utils.h"
// MAX_DP_SAMPLES should not exceed 256. Otherwise, sums and squares accumulators used in the segmentation kernel could overflow in edge cases of extremely noisy, high dynamic range signal.
#define MAX_DP_SAMPLES 256
// How should queries be Z-normalized?
#define PERCENTILE_NORM 4 // Scale so that the 5th percentile and 95th percentile value are the same in the query and subject
#define GLOBAL_ZNORM 3 // Across all queries that are loaded together
#define ONLINE_ZNORM 2 // Welford's method on each individual query
#define LOCAL_ZNORM 1 // Each query separately
#define NO_ZNORM 0 // Use as-is (i.e. it came in z-normalized)
// #define CUDA_CONSTANT_MEMORY_SIZE 66068
#define MAX_NUM_QUERIES 512
__device__ volatile int Cnum_nonzero_series = 0;
__device__ volatile short Cnonzero_series_lengths[MAX_NUM_QUERIES];
#define MAX_QUERY_SIZE CUDA_CONSTANT_MEMORY_SIZE/sizeof(QTYPE)-sizeof(short)*MAX_NUM_QUERIES-sizeof(int)-sizeof(long)-sizeof(float)*2
__device__ QTYPE Gquery[MAX_QUERY_SIZE];
__device__ QTYPE Gquery_std[MAX_QUERY_SIZE];
__device__ volatile int Gquery_length;
// How many positions to skip between anchors
#ifndef MINIDTW_STRIDE
#define MINIDTW_STRIDE 4 // can only be one of {1, 2, 4, 8}
#endif
// It turns out that it's crazy expensive computationally to just keep the best N match results (e.g. something reasonable like CUDA_THREADBLOCK_MAX_THREADS), because
// it requires tonnes of super-inefficient atomic operations and threadfences to coordinate the parallel threads across
// the device in updating the results array. Instead, allocate memory for and record up to MAX_PVAL_MATCHES_KEPT matches and only count the number beyond that.
// CUDA_THREADBLOCK_MAX_THREADS*CUDA_THREADBLOCK_MAX_THREADS = ~One million and means that the FDR correction for something as common as the Alu repeat in a
// human genome will be a reasonable estimate, and we only need to do one reduction step to determine match ranks (necessary for FDR calculation).
#ifndef MAX_PVAL_MATCHES_KEPT
#define MAX_PVAL_MATCHES_KEPT (CUDA_THREADBLOCK_MAX_THREADS*CUDA_THREADBLOCK_MAX_THREADS)
#endif
__device__ __constant__ long Tsubject_length = 0;
// And here is the underlying device global memory that actually stores the search subject
QTYPE *Dsubject = 0;
QTYPE *Dsubject_std = 0;
// We need to keep track of the stddev because we don't really Z-normalize (since this would require at least an fp16 for each subject value, which is silly if QTYPE is 8 bits)
// but rather adjust the query to have the same mean and std dev as the subject.
__device__ __constant__ float Dsubject_mean = 0;
__device__ __constant__ float Dsubject_stddev = 0;
// Also record the dynamic range of the subject, in case someone is interested in comparing it to the dynamic range of the normalized queries later, "for training and quality assurance purposes".
__device__ float Dsubject_min = 0;
__device__ float Dsubject_max = 0;
template<class T>
__host__
void load_and_normalize_queries(T *query_values, size_t query_length, int normalization_mode, float* welford_mean, float* welford_ssq, long total_values_znormalized, cudaStream_t stream=0){
cudaMemcpyToSymbol(Gquery, query_values, sizeof(T)*query_length); CUERR("Copying Gquery from CPU to GPU emory")
cudaMemcpyToSymbolAsync(::Gquery_length, &query_length, sizeof(int), 0, cudaMemcpyHostToDevice, 0); CUERR("Copying query's length from CPU to GPU constant memory")
float *mean, *stddev;
T *min, *max;
cudaMalloc(&mean, sizeof(float)); CUERR("Allocating GPU memory for query mean");
cudaMalloc(&stddev, sizeof(float)); CUERR("Allocating GPU memory for query std dev");
cudaMalloc(&min, sizeof(T)); CUERR("Allocating GPU memory for query min");
cudaMalloc(&max, sizeof(T)); CUERR("Allocating GPU memory for query max");
cudaStreamSynchronize(stream);
// Next two line used so the query location can be passed around like any other memory pointer to the stats function.
T *query_addr = 0;
cudaGetSymbolAddress((void **) &query_addr, Gquery); CUERR("Getting memory address of device constant for query");
if(normalization_mode == GLOBAL_ZNORM){
dim3 norm_grid(DIV_ROUNDUP(query_length,CUDA_THREADBLOCK_MAX_THREADS), 1, 1);
get_znorm_stats<T>(query_addr, query_length, mean, stddev, min, max, stream); CUERR("Setting global query stats");
cudaStreamSynchronize(stream); CUERR("Synchronizing stream after query znorm stats");
// float *host_mean, *host_stddev;
// T *host_min, *host_max;
// host_mean = (float*)malloc(sizeof(float));
// host_stddev = (float*)malloc(sizeof(float));
// host_min = (T*)malloc(sizeof(T));
// host_max = (T*)malloc(sizeof(T));
// cudaMemcpy(host_mean, mean, sizeof(float), cudaMemcpyDeviceToHost);
// cudaMemcpy(host_stddev, stddev, sizeof(float), cudaMemcpyDeviceToHost);
// cudaMemcpy(host_min, min, sizeof(T), cudaMemcpyDeviceToHost);
// cudaMemcpy(host_max, max, sizeof(T), cudaMemcpyDeviceToHost);
// std::cerr << "mean: " << *host_mean << ", stddev: " << *host_stddev << ", min: " << *host_min << ", max: " << *host_max << std::endl;
query_znorm<T><<<norm_grid, CUDA_THREADBLOCK_MAX_THREADS, 0, stream>>>(0, query_length, mean, stddev, min, max, normalization_mode); CUERR("Applying global query normalization in segment_and_load_queries GLOBAL_ZNORM");
cudaStreamSynchronize(stream); CUERR("Synchronizing stream after query znorm");
}
else if(normalization_mode == LOCAL_ZNORM){
// Not really worth parallelizing this outer loop as the size of the query is very constrained for now
dim3 grid(DIV_ROUNDUP(query_length,CUDA_THREADBLOCK_MAX_THREADS), 1, 1);
get_znorm_stats<T>(query_addr, query_length, mean, stddev, min, max, stream); CUERR("Setting global query stats");
query_znorm<T><<<grid,CUDA_THREADBLOCK_MAX_THREADS,0,stream>>>(0, query_length, mean, stddev, min, max, normalization_mode); CUERR("Applying local query normalization");
}
else if(normalization_mode == PERCENTILE_NORM){
}
else if(normalization_mode == ONLINE_ZNORM) {
float* tmp_mean, *tmp_ssq;
cudaMalloc(&tmp_mean, sizeof(float)); CUERR("Allocating GPU memory for temp welford mean");
cudaMalloc(&tmp_ssq, sizeof(float)); CUERR("Allocating GPU memory for temp welford ssq");
cudaMemcpy(tmp_mean, welford_mean, sizeof(float), cudaMemcpyHostToDevice); CUERR("Copying welford mean from host to device");
cudaMemcpy(tmp_ssq, welford_ssq, sizeof(float), cudaMemcpyHostToDevice); CUERR("Copying welford ssq from host to device");
welford_query_znorm<<<1, CUDA_THREADBLOCK_MAX_THREADS, 0, stream>>>(0, query_length, tmp_mean, tmp_ssq, total_values_znormalized);
cudaMemcpy(welford_mean, tmp_mean, sizeof(float), cudaMemcpyDeviceToHost); CUERR("Copying welford mean from device to host");
cudaMemcpy(welford_ssq, tmp_ssq, sizeof(float), cudaMemcpyDeviceToHost); CUERR("Copying welford ssq from device to host");
cudaFree(tmp_mean); CUERR("Freeing temp welford mean");
cudaFree(tmp_ssq); CUERR("Freeing temp welford ssq");
cudaStreamSynchronize(stream); CUERR("Synchronizing stream after query znorm");
}
else{
dim3 norm_grid(DIV_ROUNDUP(query_length,CUDA_THREADBLOCK_MAX_THREADS), 1, 1);
// Next two line used so the query location can be passed around like any other memory pointer to the stats function.
query_znorm<T><<<norm_grid, CUDA_THREADBLOCK_MAX_THREADS, 0, stream>>>(0, query_length, mean, stddev, min, max, normalization_mode); CUERR("Applying global query normalization in segment_and_load_queries no GLOBAL_ZNORM");
cudaStreamSynchronize(stream); CUERR("Synchronizing stream after query znorm");
}
cudaFree(mean); CUERR("Freeing query mean");
cudaFree(stddev); CUERR("Freeing query stddev");
cudaFree(min); CUERR("Freeing query min");
cudaFree(max); CUERR("Freeing query max");
}
// The traceback and full dtw methods use float arguments and double accumulators/results regardless of the QTYPE, because we are Z-normalizing the query.
__host__
void traceback(double *matrix, int nx, int ny, int xoffset, long yoffset, std::vector< std::pair<int,long> > &path){
int i = nx-1;
int j = ny-1;
path.push_back(std::make_pair(xoffset+i,yoffset+j)); // end anchor
while ((i > 0) && (j > 0)){
// 3 conditions below are equivalent to min((matrix[i-1, j-1], matrix[i, j-1], matrix[i-1, j]))
if (matrix[i-1+(j-1)*nx] <= matrix[i+(j-1)*nx] && matrix[i-1+(j-1)*nx] <= matrix[(i-1)+j*nx]){ // diagonal move is preferred (i.e. White-Neely step scoring)
--i;
--j;
}
else if (matrix[i+(j-1)*nx] <= matrix[i-1+(j-1)*nx] && matrix[i+(j-1)*nx] <= matrix[(i-1)+j*nx])
--j;
else
--i;
path.push_back(std::make_pair(i+xoffset,j+yoffset));
//std::cout << matrix[i+j*nx] << "/";
}
while(i > 0){
path.push_back(std::make_pair((--i)+xoffset,yoffset));
}
while(j > 0){
path.push_back(std::make_pair(xoffset,(--j)+yoffset));
}
// flip the path back to ascending order
std::reverse(path.begin(), path.end());
//std::cout << std::endl;
}
__host__
double euclidean_dtw(float *x, int nx, float *y, int ny, int xoffset, long yoffset, std::vector< std::pair<int,long> > &path){
QTYPE max = std::numeric_limits<QTYPE>::max();
double *accumulated_cost_matrix = (double *) malloc(sizeof(double)*nx*ny);
if(accumulated_cost_matrix == 0){
std::cerr << "Could not allocate cost matrix for DTW of size (" << nx << "," << ny << "), aborting" << std::endl;
exit(1);
}
for(int i = 1; i < nx; ++i){
accumulated_cost_matrix[i] = (double) max;
}
for(int i = 1; i < ny; ++i){
accumulated_cost_matrix[i*nx] = (double) max;
}
accumulated_cost_matrix[0] = abs(x[0]-y[0]);
for(int i = 1; i < nx; ++i){
for(int j = 1; j < ny; ++j){
// 3 conditions below are equivalent to min((matrix[i-1, j-1], matrix[i, j-1], matrix[i-1, j]))
if(accumulated_cost_matrix[i-1+(j-1)*nx] <= accumulated_cost_matrix[i+(j-1)*nx] && accumulated_cost_matrix[i-1+(j-1)*nx] <= accumulated_cost_matrix[i-1+j*nx]){
accumulated_cost_matrix[i+j*nx] = abs(x[i]-y[j]) + accumulated_cost_matrix[i-1+(j-1)*nx];
}
else if(accumulated_cost_matrix[i+(j-1)*nx] <= accumulated_cost_matrix[i-1+(j-1)*nx] && accumulated_cost_matrix[i+(j-1)*nx] <= accumulated_cost_matrix[i-1+j*nx]){
accumulated_cost_matrix[i+j*nx] = abs(x[i]-y[j]) + accumulated_cost_matrix[i+(j-1)*nx];
}
else{
accumulated_cost_matrix[i+j*nx] = abs(x[i]-y[j]) + accumulated_cost_matrix[i-1+j*nx];
}
}
}
traceback(accumulated_cost_matrix, nx, ny, xoffset, yoffset, path);
double cost = accumulated_cost_matrix[nx*ny-1];
free(accumulated_cost_matrix);
return cost;
}
/* returned path vector is pairs <query_position,subject_position> */
__host__
double stitch_dtw(std::vector< std::pair<int,long> > input_warp_anchors, float *query, float *subject, std::vector< std::pair<int,long> > &output_warp_path){
//TODO: align head and tail outside the anchors?
double total_cost = 0;
for(int i = 0; i < input_warp_anchors.size()-1; ++i){
std::vector< std::pair<int,long> > local_path;
local_path.reserve((input_warp_anchors[i+1].first-input_warp_anchors[i].first)*1.5);
//std::cout << "Stitching query (" << input_warp_anchors[i].first << "," << input_warp_anchors[i+1].first << ") with subject (" << input_warp_anchors[i].second << "," << input_warp_anchors[i+1].second-1 << "), cost ";
double cost = euclidean_dtw(&query[input_warp_anchors[i].first], input_warp_anchors[i+1].first-input_warp_anchors[i].first,
&subject[input_warp_anchors[i].second], input_warp_anchors[i+1].second-input_warp_anchors[i].second,
input_warp_anchors[i].first, input_warp_anchors[i].second, local_path);
//std::cout << cost << std::endl;
total_cost += cost;
output_warp_path.insert(output_warp_path.end(), local_path.begin(), local_path.end());
}
return total_cost;
}
// Adds all values in a warp and returns the result
// val - the value in the current block of the warp to be added with the rest
// returns the final added value
__inline__ __device__
float warpReduceSum(float val) {
for (int offset = CUDA_WARP_WIDTH/2; offset > 0; offset /= 2)
val += __shfl_down_sync(FULL_MASK, val, offset);
//val += __shfl_down(val, offset);
return val;
}
// Min/ max reduction code editied from: https://ernie55ernie.github.io/parallel%20programming/2018/03/17/cuda-maximum-value-with-parallel-reduction.html
template <class T>
__global__
void mean_min_max_float(float *data, int data_length, float *threadblock_means, T *threadblock_mins, T *threadblock_maxs){
__shared__ float warp_sums[CUDA_WARP_WIDTH];
extern __shared__ T cache[];
// extern __shared__ T min_cache[];
T tmp_max = threadblock_maxs[0];
T tmp_min = threadblock_mins[0];
int pos = blockIdx.x*blockDim.x+threadIdx.x;
float warp_sum = 0;
if(pos < data_length){ // Coalesced global mem reads
warp_sum = data[pos];
}
__syncwarp();
// Reduce the warp
warp_sum = warpReduceSum(warp_sum);
if(threadIdx.x%CUDA_WARP_WIDTH == 0){
warp_sums[threadIdx.x/CUDA_WARP_WIDTH] = warp_sum;
}
__syncwarp();
__syncthreads();
int warp_limit = DIV_ROUNDUP(blockDim.x,CUDA_WARP_WIDTH);
// Reduce the whole threadblock
if(threadIdx.x < CUDA_WARP_WIDTH){
warp_sum = threadIdx.x < warp_limit ? warp_sums[threadIdx.x] : 0;
__syncwarp();
warp_sum = warpReduceSum(warp_sum);
}
int cacheIndex = threadIdx.x;
while(pos < blockDim.x){
if(threadblock_maxs[pos] > tmp_max){
tmp_max = threadblock_maxs[pos];
}
if(threadblock_mins[pos] < tmp_min){
tmp_min = threadblock_mins[pos];
}
pos += blockDim.x * gridDim.x;
}
cache[cacheIndex] = tmp_max;
cache[cacheIndex + blockDim.x] = tmp_min;
__syncthreads();
// printf("cache[%i]: %f, cache[%i]: %f\n", cacheIndex, cache[cacheIndex], cacheIndex + blockDim.x, cache[cacheIndex + blockDim.x]);
int ib = blockDim.x / 2;
// printf("ib: %i\n", ib);
while (ib != 0) {
// printf("cache[%i]: %f, cache[%i + %i]: %f\n", cacheIndex, cache[cacheIndex], cacheIndex, ib, cache[cacheIndex + ib]);
if(cacheIndex < ib && cache[cacheIndex + ib] > cache[cacheIndex]){
cache[cacheIndex] = cache[cacheIndex + ib];
}
// printf("cache[%i + %i]: %f, cache[%i+ %i + %i]: %f\n", blockDim.x, cacheIndex, cache[blockDim.x + cacheIndex], blockDim.x, cacheIndex, ib, cache[blockDim.x + cacheIndex + ib]);
if(cacheIndex < ib && cache[blockDim.x + cacheIndex + ib] < cache[blockDim.x + cacheIndex]){
cache[blockDim.x + cacheIndex] = cache[blockDim.x + cacheIndex + ib];
}
__syncthreads();
ib /= 2;
}
if(cacheIndex == 0){
threadblock_maxs[0] = cache[0];
threadblock_mins[0] = cache[blockDim.x];
}
// Assign min/ max value to the front
// threadblock_mins[0] = threadblock_mins[pos] < threadblock_mins[0] ? threadblock_mins[pos] : threadblock_mins[0];
// threadblock_maxs[0] = threadblock_maxs[pos] > threadblock_maxs[0] ? threadblock_maxs[pos] : threadblock_maxs[0];
// if(threadIdx.x == 0){
// for(int i = 0; i < blockDim.x; i++){
// threadblock_mins[0] = threadblock_mins[i] < threadblock_mins[0] ? threadblock_mins[i] : threadblock_mins[0];
// threadblock_maxs[0] = threadblock_maxs[i] > threadblock_maxs[0] ? threadblock_maxs[i] : threadblock_maxs[0];
// }
// }
// Assign to global memory for later reduction across the whole data array
if(! threadIdx.x){
// Special condition in denominator for the last, potentially incomplete block
threadblock_means[blockIdx.x] = warp_sum/((blockIdx.x+1)*blockDim.x > data_length ? data_length-blockIdx.x*blockDim.x : blockDim.x);
}
}
template <class T>
__global__
void
mean_min_max(T *data, int data_length, float *threadblock_means, T *threadblock_mins, T *threadblock_maxs){
__shared__ T threadblock_data[CUDA_THREADBLOCK_MAX_THREADS];
__shared__ float warp_sums[CUDA_WARP_WIDTH];
__shared__ T warp_mins[CUDA_WARP_WIDTH];
__shared__ T warp_maxs[CUDA_WARP_WIDTH];
if(threadIdx.x < CUDA_WARP_WIDTH){
warp_mins[threadIdx.x] = sizeof(T) == 1 ? UCHAR_MAX : (sizeof(T) == 2 ? SHRT_MAX : FLT_MAX);
warp_maxs[threadIdx.x] = sizeof(T) == 1 ? 0 : (sizeof(T) == 2 ? SHRT_MIN : FLT_MIN);
}
__syncwarp();
__syncthreads();
// This same method is used to calculate stats on both data in global memory (e.g. subject), and device constants (e.g. query).
// To allow both of these to be processed correctly, we can indicate that we're using the query by passing in.
if(data == 0){
data = (T*)&Gquery[0];
}
int pos = blockIdx.x*blockDim.x+threadIdx.x;
float warp_sum = 0;
if(pos < data_length){ // Coalesced global mem reads
// printf("data[%i]: %lf\n", pos, data[threadIdx.x]);
threadblock_data[threadIdx.x] = data[pos];
warp_sum = (double) threadblock_data[threadIdx.x]; // __shfl*() only works with int or float, the latter is the safest bet to retain true data values regardless of QTYPE
}
__syncwarp();
// Reduce the warp
warp_sum = warpReduceSum(warp_sum);
if(threadIdx.x%CUDA_WARP_WIDTH == 0){
T warp_max = sizeof(T) == 1 ? 0 : (sizeof(T) == 2 ? SHRT_MIN : FLT_MIN);
T warp_min = sizeof(T) == 1 ? UCHAR_MAX : (sizeof(T) == 2 ? SHRT_MAX : FLT_MAX);
for(int i = 0; i < CUDA_WARP_WIDTH && threadIdx.x+i < (data_length-blockIdx.x*blockDim.x); i++){
if(threadblock_data[threadIdx.x+i] < warp_min) warp_min = threadblock_data[threadIdx.x+i];
if(threadblock_data[threadIdx.x+i] > warp_max) warp_max = threadblock_data[threadIdx.x+i];
}
warp_mins[threadIdx.x/CUDA_WARP_WIDTH] = warp_min;
warp_maxs[threadIdx.x/CUDA_WARP_WIDTH] = warp_max;
warp_sums[threadIdx.x/CUDA_WARP_WIDTH] = warp_sum;
//printf("Block %d, Warp (%d) sum: %f\n", blockIdx.x, warp_sums[threadIdx.x/CUDA_WARP_WIDTH]);
}
__syncwarp();
__syncthreads();
int warp_limit = DIV_ROUNDUP(blockDim.x,CUDA_WARP_WIDTH);
// Reduce the whole threadblock
if(threadIdx.x < CUDA_WARP_WIDTH){
//printf("threadIdx %i, Warp warp_limit (%i) warp_sums[threadIdx.x]: %f\n", threadIdx.x, threadIdx.x, warp_sums[threadIdx.x]);
warp_sum = threadIdx.x < warp_limit ? warp_sums[threadIdx.x] : 0;
for(int swath = 1; threadIdx.x+swath < warp_limit; swath *= 2){
if(threadIdx.x%(swath*2) == 0){
if(warp_mins[threadIdx.x] > warp_mins[threadIdx.x+swath]) warp_mins[threadIdx.x] = warp_mins[threadIdx.x+swath];
if(warp_maxs[threadIdx.x] < warp_maxs[threadIdx.x+swath]) warp_maxs[threadIdx.x] = warp_maxs[threadIdx.x+swath];
}
}
__syncwarp();
// printf("Block %d, Warp reduce (%d) sum: %f\n", blockIdx.x, threadIdx.x, warp_sum);
warp_sum = warpReduceSum(warp_sum);
}
// Assign to global memory for later reduction across the whole data array
if(! threadIdx.x){
// Special condition in denominator for the last, potentially incomplete block
// printf("%f/((%i+1)*%i, %i, %i-%i*%i, %i\n", warp_sum, blockIdx.x, blockDim.x, data_length, data_length, blockIdx.x, blockDim.x, blockDim.x);
threadblock_means[blockIdx.x] = warp_sum/((blockIdx.x+1)*blockDim.x > data_length ? data_length-blockIdx.x*blockDim.x : blockDim.x);
threadblock_mins[blockIdx.x] = warp_mins[0];
threadblock_maxs[blockIdx.x] = warp_maxs[0];
}
}
template <class T>
__global__
void
variance(T *data, long data_length, long orig_data_length, float *data_mean, float *threadblock_variances){
int pos = blockIdx.x*blockDim.x+threadIdx.x;
// This same method is used to calculate stats on both data in global memory (e.g. subject), and device constants (e.g. query).
// To allow both of these to be processed correctly, we can indicate that we're using the query by passing in
if(data == 0){
data = (T*)&Gquery[0];
}
float warp_var = 0;
if(pos < data_length){ // Coalesced global mem reads
warp_var = data[pos]-data_mean[0];
warp_var *= warp_var;
}
// Reduce the warp
__syncwarp();
__shared__ T warp_variances[CUDA_WARP_WIDTH];
warp_var = warpReduceSum(warp_var);
if(threadIdx.x%CUDA_WARP_WIDTH == 0){
warp_variances[threadIdx.x/CUDA_WARP_WIDTH] = warp_var;
}
__syncwarp();
__syncthreads();
// Reduce the whole threadblock and assign result to global memory for later reduction across the whole data array
int warp_limit = DIV_ROUNDUP(blockDim.x,CUDA_WARP_WIDTH);
if(threadIdx.x < CUDA_WARP_WIDTH){
warp_var = threadIdx.x < warp_limit ? warp_variances[threadIdx.x] : 0;
// *NOTA BENE*: if CUDA_THREADBLOCK_MAX_THREADS/CUDA_WARP_WIDTH > CUDA_WARP_WIDTH this reduce will be incomplete!!
// As of 2019 this is not the case for any CUDA device (typically 1028/32, but can be 512/32 on older GPU cards).
__syncwarp();
warp_var = warpReduceSum(warp_var);
}
if(! threadIdx.x){
// If this is the final reduction calculate the std dev from the variance
if(gridDim.x == 1)
threadblock_variances[0] = (float) sqrt(warp_var/orig_data_length); // Whole population not the sample
else
threadblock_variances[blockIdx.x] = warp_var;
}
}
// For reducing step, variance accumulation is double regardless of QTYPE
__global__
void
variance_float(float *data, long data_length, long orig_data_length, float *data_mean, float *threadblock_variances){
int pos = blockIdx.x*blockDim.x+threadIdx.x;
__shared__ double dwarp_variances[CUDA_WARP_WIDTH];
float warp_var = 0;
if(pos < data_length){ // Coalesced global mem reads
warp_var = data[pos];
}
// Reduce the warp
__syncwarp();
warp_var = warpReduceSum(warp_var);
if(threadIdx.x%CUDA_WARP_WIDTH == 0){
dwarp_variances[threadIdx.x/CUDA_WARP_WIDTH] = warp_var;
}
__syncwarp();
__syncthreads();
// Reduce the whole threadblock and assign result to global memory for later reduction across the whole data array
int warp_limit = DIV_ROUNDUP(blockDim.x,CUDA_WARP_WIDTH);
if(threadIdx.x < CUDA_WARP_WIDTH){
warp_var = (float) (threadIdx.x < warp_limit ? dwarp_variances[threadIdx.x] : 0.0);
// *NOTA BENE*: if CUDA_THREADBLOCK_MAX_THREADS/CUDA_WARP_WIDTH > CUDA_WARP_WIDTH this reduce will be incomplete!!
// As of 2019 this is not the case for any CUDA device (typically 1028/32, but can be 512/32 on older GPU cards).
__syncwarp();
warp_var = warpReduceSum(warp_var);
if(! threadIdx.x){
threadblock_variances[blockIdx.x] = warp_var;
//printf("Block final %d variance (within %d): %f\n", blockIdx.x, (int) data_length, warp_var);
// If this is the last reduction, calculate the std dev from the variance so we don't have to copy the variance back to the CPU to do so
if(gridDim.x == 1)
threadblock_variances[0] = (float) sqrt(threadblock_variances[0]/orig_data_length);
}
}
}
// Host function that gets the znormalized stats of a given set of data
// data - the data we are getting the stats from
// data_length - the length of the input data
// mean - the mean of the data to be calculated
// stddev - the standard deviation of the data to be calculated
// min - the minimum value of the data to be obtained
// max - the max value of the data to be obtained
// stream - CUDA kernel stream
template <class T>
__host__
void
get_znorm_stats(T *data, long data_length, float *mean, float *stddev, T *min, T *max, cudaStream_t stream){
if(data_length < 2){
std::cerr << "Provided data length was " << data_length << ", from which Z-normalization stats cannot be calculated, aborting." << std::endl;
exit(23);
}
// TODO: merge to a single cudaMalloc()
int num_threadblocks = DIV_ROUNDUP(data_length, CUDA_THREADBLOCK_MAX_THREADS);
float *threadblock_means;
T *threadblock_mins;
T *threadblock_maxs;
cudaMalloc(&threadblock_means, sizeof(float)*num_threadblocks*CUDA_THREADBLOCK_MAX_THREADS); CUERR("Allocating device memory for query Z-norm threadblock means");
cudaMalloc(&threadblock_mins, sizeof(T)*num_threadblocks*CUDA_THREADBLOCK_MAX_THREADS); CUERR("Allocating device memory for query Z-norm threadblock mins");
cudaMalloc(&threadblock_maxs, sizeof(T)*num_threadblocks*CUDA_THREADBLOCK_MAX_THREADS); CUERR("Allocating device memory for query Z-norm threadblock maxs");
float *threadblock_results;
// There is a chance that these variances sums could get really big for very long datasets, TODO we may want to store into a double rather than float if QTYPE = float
cudaMalloc(&threadblock_results, sizeof(float)*num_threadblocks); CUERR("Allocating device memory for query Z-norm threadblock variances");
dim3 grid(num_threadblocks, 1, 1);
int req_threadblock_shared_memory = CUDA_THREADBLOCK_MAX_THREADS*sizeof(T)+(sizeof(T)*2+sizeof(float)+2)*CUDA_THREADBLOCK_MAX_THREADS/CUDA_WARP_WIDTH;
mean_min_max<T><<<grid,CUDA_THREADBLOCK_MAX_THREADS,req_threadblock_shared_memory,stream>>>(data, data_length, threadblock_means, threadblock_mins, threadblock_maxs); CUERR("Calculating data mean/min/max");
// Reduce across threadblock results
while(num_threadblocks > 1){
grid.x = DIV_ROUNDUP(num_threadblocks,CUDA_THREADBLOCK_MAX_THREADS);
int threads = num_threadblocks > CUDA_THREADBLOCK_MAX_THREADS ? CUDA_THREADBLOCK_MAX_THREADS : num_threadblocks;
req_threadblock_shared_memory = sizeof(float)*CUDA_WARP_WIDTH + 2*num_threadblocks*sizeof(T);
mean_min_max_float<T><<<grid,threads,req_threadblock_shared_memory,stream>>>(threadblock_means, num_threadblocks, threadblock_means, threadblock_mins, threadblock_maxs); CUERR("Reducing calculated data mean/min/max");
num_threadblocks = grid.x;
}
cudaMemcpyAsync(min, threadblock_mins, sizeof(T), cudaMemcpyDeviceToDevice, stream); CUERR("Launching copy of calculated min within GPU global memory");
cudaMemcpyAsync(max, threadblock_maxs, sizeof(T), cudaMemcpyDeviceToDevice, stream); CUERR("Launching copy of calculated max within GPU global memory");
num_threadblocks = DIV_ROUNDUP(data_length, CUDA_THREADBLOCK_MAX_THREADS);
req_threadblock_shared_memory = sizeof(float)*CUDA_THREADBLOCK_MAX_THREADS/CUDA_WARP_WIDTH;
grid.x = num_threadblocks;
variance<T><<<grid,CUDA_THREADBLOCK_MAX_THREADS,req_threadblock_shared_memory,stream>>>(data, data_length, data_length, threadblock_means, threadblock_results); CUERR("Calculating data variance");
// Reduce across threadblock results
req_threadblock_shared_memory = sizeof(float)*CUDA_THREADBLOCK_MAX_THREADS/CUDA_WARP_WIDTH;
while(num_threadblocks > 1){
grid.x = DIV_ROUNDUP(num_threadblocks,CUDA_THREADBLOCK_MAX_THREADS);
int threads = num_threadblocks > CUDA_THREADBLOCK_MAX_THREADS ? CUDA_THREADBLOCK_MAX_THREADS : num_threadblocks;
variance_float<<<grid,threads,req_threadblock_shared_memory,stream>>>(threadblock_results, num_threadblocks, data_length, threadblock_means,
threadblock_results); CUERR("Reducing calculated data variance");
num_threadblocks = grid.x;
}
cudaMemcpyAsync(mean, threadblock_means, sizeof(float), cudaMemcpyDeviceToDevice, stream); CUERR("Launching copy of calculated mean within GPU global memory");
cudaMemcpyAsync(stddev, threadblock_results, sizeof(float), cudaMemcpyDeviceToDevice, stream); CUERR("Launching copy of calculated std dev within GPU global memory");
// TODO: Synchronizes on the first cudaFree()... move to a thread?
cudaFree(threadblock_results); CUERR("Freeing device memory for query Z-norm threadblock variances");
cudaFree(threadblock_means); CUERR("Freeing device memory for query Z-norm threadblock means");
cudaFree(threadblock_mins); CUERR("Freeing device memory for query Z-norm threadblock mins");
cudaFree(threadblock_maxs); CUERR("Freeing device memory for query Z-norm threadblock maxs");
}
// Hides the kernel launch details and inherent asynchronicity from the CUDA-agnotic caller.
// Note that it helps to speed up the transfer if the memory for *subject is page-locked on the CPU side.
// Host function that loads the subject onto the GPU
// subject - the subject to be loaded
// subject_std - subject standard deviation values
// subject_length - the length of the given subject
// use_std - flag for using subject standard deviation values
// stream - CUDA kernel stream
__host__
void
load_subject(QTYPE *subject, QTYPE *subject_std, long subject_length, int use_std, cudaStream_t stream=0){
if((subject && subject_length > 0) && ((use_std != 0) == (subject_std != 0))){
// It turns out that cudaMalloc()s are pretty expensive (~1ms) so just allocate once (hence the +4) and stuff all the extra variables in that space at the end
cudaMalloc(&Dsubject, sizeof(QTYPE)*(subject_length+10)); CUERR("Allocating GPU memory for subject and its stats")
cudaMemcpyToSymbolAsync(::Tsubject_length, &subject_length, sizeof(long), 0, cudaMemcpyHostToDevice, stream); CUERR("Copying subject length from CPU to GPU memory")
// The expensive step of copying a potentially large amount of data across the PCI bus...at least it's all in one chunk for efficiency.
cudaMemcpyAsync(Dsubject, subject, sizeof(QTYPE)*subject_length, cudaMemcpyHostToDevice, stream); CUERR("Copying subject from CPU to GPU memory")
if(use_std) {
cudaMalloc(&Dsubject_std, sizeof(QTYPE)*(subject_length+10));
cudaMemcpyAsync(Dsubject_std, subject_std, sizeof(QTYPE)*subject_length, cudaMemcpyHostToDevice, stream);
}
register int offset_mult = DIV_ROUNDUP(sizeof(float), sizeof(QTYPE));
register float *mean_ptr = (float *) &Dsubject[subject_length];
register float *stddev_ptr = (float *) &Dsubject[subject_length+offset_mult];
register QTYPE *min_ptr = (QTYPE *) &Dsubject[subject_length+2*offset_mult];
register QTYPE *max_ptr = (QTYPE *) &Dsubject[subject_length+2*offset_mult+1];
get_znorm_stats(Dsubject, subject_length, mean_ptr, stddev_ptr, min_ptr, max_ptr, stream);
cudaMemcpyToSymbolAsync(::Dsubject_mean, mean_ptr, sizeof(float), 0, cudaMemcpyDeviceToDevice, stream); CUERR("Copying subject mean from GPU global to constant memory")
cudaMemcpyToSymbolAsync(::Dsubject_stddev, stddev_ptr, sizeof(float), 0, cudaMemcpyDeviceToDevice, stream); CUERR("Copying subject std dev from GPU global to constant memory")
cudaMemcpyToSymbolAsync(::Dsubject_min, min_ptr, sizeof(QTYPE), 0, cudaMemcpyDeviceToDevice, stream); CUERR("Copying subject min from GPU global to constant memory")
cudaMemcpyToSymbolAsync(::Dsubject_max, max_ptr, sizeof(QTYPE), 0, cudaMemcpyDeviceToDevice, stream); CUERR("Copying subject max from GPU global to constant memory")
}
else{
subject_length = 0;
cudaMemcpyToSymbolAsync(::Tsubject_length, &subject_length, sizeof(long), 0, cudaMemcpyHostToDevice, stream); CUERR("Zeroing subject length in GPU constant memory")
}
cudaStreamSynchronize(stream); CUERR("Synchronizing stream after loading the subject to GPU")
}
// Funtion that gets the Dsubject pointer for testing
// returns the Dsubject pointer
QTYPE* get_subject_pointer(){
return Dsubject;
}
// Funtion that gets the Dsubject standard deviation pointer for testing
// returns the Dsubject_std pointer
QTYPE* get_subject_std_pointer(){
return Dsubject_std;
}
// Z-normalize the query segmentation results (now in global constant memory) *all together*.
template <class T>
__global__
void
query_znorm(int offset, size_t length, float *data_mean, float *data_stddev, T *min, T *max, int normalization_mode, cudaStream_t stream){
// TODO: There is a small chance that our quasi-z-normalization of a low dynamic range data stream (typically query) against a high dynamic
// range target (subject) will cause the data stream's extreme values to shift or scale outside the bounds of QTYPE. If this happens,
// we will cap those transformed data stream values to QTYPE's min or max and output a warning.
int pos = offset+blockIdx.x*blockDim.x+threadIdx.x;
float mean = *data_mean;
// if(threadIdx.x == 0 && normalization_mode != NO_ZNORM){
// printf("Data mean in znorm: %f and Sub mean: %f\n", mean, Dsubject_mean);
// printf("Data std in znorm: %f and Sub std: %f\n", *data_stddev, Dsubject_stddev);
// printf("Min: %f and Max: %f\n", *min, *max);
// }
if(pos < offset+length){ // Coalesced global mem reads and writes
// printf("pos: %i\n", pos);
// printf("Gquery[pos]: %f\n", Gquery[pos]);
if(normalization_mode != NO_ZNORM){
Gquery[pos] = (QTYPE) ((Gquery[pos]-mean)/(*data_stddev)*Dsubject_stddev+Dsubject_mean);
Gquery_std[pos] = (QTYPE) (Gquery_std[pos]/(*data_stddev)*Dsubject_stddev);
}
}
}
__global__
void
welford_query_znorm(int offset, int length, float* mean, float* ssq, long total_values_znormalized, cudaStream_t stream){
// TODO: There is a small chance that our quasi-z-normalization of a low dynamic range data stream (typically query) against a high dynamic
// range target (subject) will cause the data stream's extreme values to shift or scale outside the bounds of QTYPE. If this happens,
// we will cap those transformed data stream values to QTYPE's min or max and output a warning.
// TODO: With only one thread active, this will run very slowly. See if there is any way to speed it up
int tid = threadIdx.x;
int pos = tid;
int num_it = DIV_ROUNDUP(length, CUDA_THREADBLOCK_MAX_THREADS);
__shared__ QTYPE shared[CUDA_THREADBLOCK_MAX_THREADS];
// float mean = 0;
float oldmean = 0;
// float ssq = 0;
float stddev;
float sample;
for(int it = 0; it < num_it; it++) {
if(pos < length)
shared[tid] = Gquery[pos];
if(tid == 0){ // Coalesced global mem reads and writes
long p = total_values_znormalized;
for(int i = 0; i < CUDA_THREADBLOCK_MAX_THREADS && p < total_values_znormalized + length; i++, p++) {
sample = float(shared[i]);
oldmean = (*mean);
(*mean) += (sample-(*mean))/(p+1);
(*ssq) += (sample-(*mean))*(sample-oldmean);
stddev = (p > 0) ? sqrt((*ssq)/(p)) : 1; // TODO unroll this to avoid the if statement
Gquery[p] = (QTYPE) ((sample-(*mean))/(stddev)*Dsubject_stddev+Dsubject_mean);
Gquery_std[p] = (QTYPE) (Gquery_std[p]/(stddev)*Dsubject_stddev);
}
}
pos += 1024;
}
}
/* Hard coded full 10 point query and subject Euclidean distance White-Neely step DTW with Sakoe-Chiba band of 5.
It shares subject values across threadblock via L1 cache (shared memory) and query values via L1 cache too (constant memory).
Size of subject is limited to max(signed int) since CUDA supplied variables in blockIdx.x*blockDim.x+threadIdx.x
job index calculation is signed integer range bound. Query offset as multiple of ten is inferred by job index modulus subject length.
There are 24 4-byte registers per kernel thread.
*/
/* Note that this GPU-based DTW method assumes the Subject and Query are already z-normalized. Set up to run multiple DTWs in a thread block on a single CUDA multi-processor. */
// Let's make a function dist() for the Euclidean distance (L2 norm, not to be confused with L2 cache), just do the square w/o sqrt() as relative rank matters, not final value
#define std_dist(query_i,subject_j,qstd_i,std_i) ((query_i-subject_j)*(query_i-subject_j) + (qstd_i-std_i)*(qstd_i-std_i))
#define dist(query_i,subject_j) ((query_i-subject_j)*(query_i-subject_j))
/// MINIDTW_STRIDE must be power of 2
template<int soft_dtw_size, int soft_dtw_warp>
__global__
void soft_dtw_std(const int num_query_indices, QTYPE * subject, QTYPE * subject_std, long long * query_adjacent_candidates, QTYPE_ACC * query_adjacent_distances){
const int soft_dtw_block_dim = CUDA_THREADBLOCK_MAX_THREADS/MINIDTW_STRIDE;
const int soft_dtw_shared_it_float = soft_dtw_size;
const int soft_dtw_stride = 1+soft_dtw_warp+soft_dtw_warp;
// calculate jobid and subject/query sample locations for the thread
// work with padding so that subject length is always CUDA_THREADBLOCK_MAX_THREADS ie. always fills the warp
long long jobid = (long long) blockIdx.x * (long long) blockDim.x + (long long) threadIdx.x;
long long eff_subject_length = CUDA_THREADBLOCK_MAX_THREADS*(DIV_ROUNDUP(((long long)Tsubject_length),CUDA_THREADBLOCK_MAX_THREADS));
long long strided_jobid = jobid*MINIDTW_STRIDE;
long long subject_offset = strided_jobid%eff_subject_length;
int query_offset = int(strided_jobid/eff_subject_length*soft_dtw_size);
// shared memory in 3 parts, one for storing subject, one for results of minidtws, one for indices for each thread when calculating min
__shared__ char shared[sizeof(QTYPE)*(CUDA_THREADBLOCK_MAX_THREADS + soft_dtw_size)*2 + sizeof(QTYPE_ACC)*soft_dtw_block_dim + sizeof(long long)*soft_dtw_block_dim/2 + 32];
QTYPE *subjects = (QTYPE *) &shared[0];
size_t temp_unpadded = sizeof(QTYPE)*(CUDA_THREADBLOCK_MAX_THREADS + soft_dtw_size);
size_t temp_padded = temp_unpadded + 8 - temp_unpadded%8;
QTYPE *subject_stds = (QTYPE *) &shared[temp_padded];
temp_unpadded = temp_padded + sizeof(QTYPE)*(CUDA_THREADBLOCK_MAX_THREADS + soft_dtw_size);
temp_padded = temp_unpadded + 8 - temp_unpadded%8;
QTYPE_ACC *dtw_results = (QTYPE_ACC *) &shared[temp_padded];
temp_unpadded = temp_padded + sizeof(QTYPE_ACC)*soft_dtw_block_dim;
temp_padded = temp_unpadded + 16 - temp_unpadded%16;
long long *idata = (long long *) &shared[temp_padded];
if(subject_offset < (long long) Tsubject_length - soft_dtw_size){ // if valid position (block is padded to CUDA_THREADBLOCK_MAX_THREADS)
// Load the bits of the subject (from texture memory) that this thread will use into shared memory (L1 cache)
//if(sizeof(QTYPE) == 4){
#pragma unroll
for(int i = 0; i < soft_dtw_shared_it_float; i++) {
subjects[threadIdx.x + i] = subject[subject_offset + i];
}
#pragma unroll
for(int i = 0; i < soft_dtw_shared_it_float; i++) {
subject_stds[threadIdx.x + i] = subject_std[subject_offset + i];
}
// registers that will carry intermediate results
// ideally these are in registers, but not always, may be sent out to local mem
QTYPE q;
QTYPE qstd;
QTYPE s[soft_dtw_size];
QTYPE std[soft_dtw_size];
QTYPE_ACC p[soft_dtw_stride];
// load each subject value into register just-in-time to avoid memory bottleneck on warp start
#pragma unroll
for(int i = 0; i < soft_dtw_stride; i++) {
s[i] = subjects[threadIdx.x + i];
}
#pragma unroll
for(int i = 0; i < soft_dtw_stride; i++) {
std[i] = subject_stds[threadIdx.x + i];
}
#pragma unroll
for(int query_pos = 0; query_pos < soft_dtw_size; query_pos++) {
int load_s = query_pos + soft_dtw_stride;
q = Gquery[query_offset + query_pos];
qstd = Gquery_std[query_offset + query_pos];
QTYPE_ACC previous_p = 0;
if(load_s < soft_dtw_size) {
s[load_s] = subjects[threadIdx.x + load_s];
std[load_s] = subject_stds[threadIdx.x + load_s];
}
int stride_pos = query_pos - soft_dtw_warp;
#pragma unroll
for(int i = 0; i < soft_dtw_size; i++) {
if(i < soft_dtw_stride) {
if(stride_pos >= 0 && stride_pos < soft_dtw_size) {
bool at_left = stride_pos == 0;
bool at_bottom = query_pos == 0;
if(stride_pos != 0 && query_pos != 0) {
if(i == 0) {
previous_p = dtwmin(p[i], p[i + 1]);
}
else if(i == soft_dtw_stride-1) {
previous_p = dtwmin(p[i], previous_p);
}
else {
previous_p = dtwmin(p[i + 1], previous_p);
previous_p = dtwmin(p[i], previous_p);
}
}
else if(at_left && !at_bottom) {
previous_p = p[i + 1];
}
p[i] = std_dist(q,s[stride_pos],qstd,std[stride_pos]) + previous_p;
previous_p = p[i];
}
stride_pos++;
}
}
}
dtw_results[threadIdx.x] = p[soft_dtw_warp];
} else {
dtw_results[threadIdx.x] = DTW_MAX;
}
__syncthreads();
QTYPE_ACC min = DTW_MAX;
long long index = -1;
// call find min function, only use of shared idata (for inter-thread communication), returns min and index
block_findMin(dtw_results, idata, threadIdx.x, blockDim.x, strided_jobid, min, index);
// store result, indexed by query first
if(threadIdx.x == 0) {
query_adjacent_candidates[(subject_offset/CUDA_THREADBLOCK_MAX_THREADS)*num_query_indices + query_offset/soft_dtw_size] = index%eff_subject_length;
query_adjacent_distances[(subject_offset/CUDA_THREADBLOCK_MAX_THREADS)*num_query_indices + query_offset/soft_dtw_size] = min;
}
}
template<int soft_dtw_size, int soft_dtw_warp>
__global__
void soft_dtw(const int num_query_indices, QTYPE * subject, QTYPE * subject_std, long long * query_adjacent_candidates, QTYPE_ACC * query_adjacent_distances){
const int soft_dtw_block_dim = CUDA_THREADBLOCK_MAX_THREADS/MINIDTW_STRIDE;
const int soft_dtw_shared_it_float = soft_dtw_size;
const int soft_dtw_stride = 1+soft_dtw_warp+soft_dtw_warp;
// calculate jobid and subject/query sample locations for the thread
// work with padding so that subject length is always CUDA_THREADBLOCK_MAX_THREADS ie. always fills the warp
long long jobid = (long long) blockIdx.x * (long long) blockDim.x + (long long) threadIdx.x;
long long eff_subject_length = CUDA_THREADBLOCK_MAX_THREADS*(DIV_ROUNDUP(((long long)Tsubject_length),CUDA_THREADBLOCK_MAX_THREADS));
long long strided_jobid = jobid*MINIDTW_STRIDE;
long long subject_offset = strided_jobid%eff_subject_length;
int query_offset = int(strided_jobid/eff_subject_length*soft_dtw_size);
// shared memory in 3 parts, one for storing subject, one for results of minidtws, one for indices for each thread when calculating min
__shared__ char shared[sizeof(QTYPE)*(CUDA_THREADBLOCK_MAX_THREADS + soft_dtw_size)*2 + sizeof(QTYPE_ACC)*soft_dtw_block_dim + sizeof(long long)*soft_dtw_block_dim/2 + 32];
QTYPE *subjects = (QTYPE *) &shared[0];
size_t temp_unpadded = sizeof(QTYPE)*(CUDA_THREADBLOCK_MAX_THREADS + soft_dtw_size);
size_t temp_padded = temp_unpadded + 8 - temp_unpadded%8;
QTYPE_ACC *dtw_results = (QTYPE_ACC *) &shared[temp_padded];
temp_unpadded = temp_padded + sizeof(QTYPE_ACC)*soft_dtw_block_dim;
temp_padded = temp_unpadded + 16 - temp_unpadded%16;
long long *idata = (long long *) &shared[temp_padded];
if(subject_offset <= (long long) Tsubject_length - soft_dtw_size){ // if valid position (block is padded to CUDA_THREADBLOCK_MAX_THREADS)
// Load the bits of the subject (from texture memory) that this thread will use into shared memory (L1 cache)
//if(sizeof(QTYPE) == 4){
#pragma unroll
for(int i = 0; i < soft_dtw_shared_it_float; i++) {
subjects[threadIdx.x + i] = subject[subject_offset + i];
}
// registers that will carry intermediate results
// ideally these are in registers, but not always, may be sent out to local mem
QTYPE q;
QTYPE s[soft_dtw_size];
QTYPE_ACC p[soft_dtw_stride];
// load each subject value into register just-in-time to avoid memory bottleneck on warp start