From c9f7392d05a7ef11dac27b89763a52dd43df5e49 Mon Sep 17 00:00:00 2001 From: Jonas Meisner Date: Sun, 24 Sep 2023 18:09:01 +0200 Subject: [PATCH] Complete refactor of EMU. emu v1.0 --- README.md | 19 +- emu/emu.py | 206 --------------------- emu/halko.pyx | 341 ---------------------------------- emu/shared.py | 383 --------------------------------------- emu/shared_cy.pyx | 258 -------------------------- environment.yml | 3 +- pyproject.toml | 3 + requirements.txt | 1 - setup.py | 43 +++-- {emu => src}/__init__.py | 0 src/algorithm.py | 129 +++++++++++++ src/main.py | 197 ++++++++++++++++++++ src/memory.py | 120 ++++++++++++ src/memory_cy.pyx | 118 ++++++++++++ src/shared.py | 93 ++++++++++ src/shared_cy.pyx | 224 +++++++++++++++++++++++ 16 files changed, 915 insertions(+), 1223 deletions(-) delete mode 100644 emu/emu.py delete mode 100644 emu/halko.pyx delete mode 100644 emu/shared.py delete mode 100644 emu/shared_cy.pyx create mode 100644 pyproject.toml rename {emu => src}/__init__.py (100%) create mode 100644 src/algorithm.py create mode 100644 src/main.py create mode 100644 src/memory.py create mode 100644 src/memory_cy.pyx create mode 100644 src/shared.py create mode 100644 src/shared_cy.pyx diff --git a/README.md b/README.md index 25101a6..a3e6828 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,17 @@ # EMU -EMU is a software for performing principal component analysis (PCA) in the presence of missingness for genetic datasets. EMU can handle both random and non-random missingness by modelling it directly through a truncated SVD approach. EMU uses PLINK files as input. +EMU is a software for performing principal component analysis (PCA) in the presence of missingness for genetic datasets. EMU can handle both random and non-random missingness by modelling it directly through a truncated SVD approach. EMU uses binary PLINK files as input. ### Citation Please cite our paper in *Bioinformatics*: https://doi.org/10.1093/bioinformatics/btab027 ### Dependencies -The EMU software relies on the following Python packages that you can install through conda (recommended) or pip: +The EMU software relies on the following two Python packages that you can install through conda (recommended) or pip: - numpy - cython -- scipy You can create an environment through conda easily as follows: -``` +```bash conda env create -f environment.yml ``` @@ -20,7 +19,6 @@ conda env create -f environment.yml ```bash git clone https://github.com/Rosemeis/emu.git cd emu -python setup.py build_ext --inplace pip3 install -e . ``` @@ -34,19 +32,12 @@ EMU works directly on PLINK files. emu -h # Using PLINK files directly (test.bed, test.bim, test.fam) - Give prefix -emu -p test -e 2 -t 64 -o test.emu -``` - -### EM Acceleration (Default - Recommended) -An acceleration scheme is being used with both SVD options (*halko* or *arpack*). Each iteration will be longer as 2 extra steps are performed but the overall number of iterations for convergence is decreased significantly. However, the EM acceleration can be turned off. -```bash -# No acceleration -emu -p test -e 2 -t 64 -o test.emu.no_accel --no_accel +emu --bfile test --n_eig 2 --threads 64 --out test.emu ``` ### Memory efficient implementation A more memory efficient implementation has been added. It is based of the randomized SVD algorithm ([Halko et al.](https://arxiv.org/abs/0909.4061)) but using custom matrix multiplications that can handle decomposed matrices. Only factor matrices as well as the 2-bit data matrix is kept in memory. ```bash # Example run using '-m' argument -emu -m -p test -e 2 -t 64 -o test.emu.mem +emu --mem --bfile test -e 2 -t 64 -o test.emu.mem ``` diff --git a/emu/emu.py b/emu/emu.py deleted file mode 100644 index 4968fbe..0000000 --- a/emu/emu.py +++ /dev/null @@ -1,206 +0,0 @@ -""" -EMU. -Main caller. Performs iterative SVD of allele count matrix (EM-PCA) based on either ARPACK, or Halko method. - -Jonas Meisner, Siyang Liu, Mingxi Huang and Anders Albrechtsen -""" - -__author__ = "Jonas Meisner" - -# Libraries -import argparse -import os -import subprocess -import sys -from datetime import datetime - -# Reader help function -def extract_length(filename): - process = subprocess.Popen(['wc', '-l', filename], stdout=subprocess.PIPE) - result, err = process.communicate() - return int(result.split()[0]) - -# Argparse -parser = argparse.ArgumentParser(prog="emu") -parser.add_argument("--version", action="version", version="%(prog)s alpha 0.9") -parser.add_argument("-m", "--mem", action="store_true", - help="EMU-mem variant") -parser.add_argument("-p", "--plink", metavar="FILE-PREFIX", - help="Prefix for PLINK files (.bed, .bim, .fam)") -parser.add_argument("-e", "--n_eig", metavar="INT", type=int, - help="Number of eigenvectors to use in iterative estimation") -parser.add_argument("-k", "--n_out", metavar="INT", type=int, - help="Number of eigenvectors to output in final SVD") -parser.add_argument("-t", "--threads", metavar="INT", type=int, default=1, - help="Number of threads") -parser.add_argument("-f", "--maf", metavar="FLOAT", type=float, default=0.00, - help="Threshold for minor allele frequencies (0.00)") -parser.add_argument("-s", "--selection", action="store_true", - help="Perform PC-based selection scan (Galinsky et al. 2016)") -parser.add_argument("-o", "--out", metavar="OUTPUT", default="emu", - help="Prefix output name",) -parser.add_argument("--no_accel", action="store_true", - help="Turn off acceleration for EM (not recommended)") -parser.add_argument("--iter", metavar="INT", type=int, default=100, - help="Maximum iterations in estimation of individual allele frequencies (100)") -parser.add_argument("--tole", metavar="FLOAT", type=float, default=5e-7, - help="Tolerance in update for individual allele frequencies (5e-7)") -parser.add_argument("--svd", metavar="STRING", default="halko", - help="Method for performing truncated SVD (arpack/halko)") -parser.add_argument("--svd_power", metavar="INT", type=int, default=5, - help="Number of power iterations in randomized SVD (Halko)") -parser.add_argument("--loadings", action="store_true", - help="Save SNP loadings") -parser.add_argument("--maf_save", action="store_true", - help="Save estimated population allele frequencies") -parser.add_argument("--sites_save", action="store_true", - help="Save vector of sites after MAF filtering (Binary)") -parser.add_argument("--cost", action="store_true", - help="Output min-cost each iteration (ONLY EMU)") -parser.add_argument("--cost_step", action="store_true", - help="Use acceleration based on cost (ONLY EMU)") -parser.add_argument("--seed", metavar="INT", type=int, default=0, - help="Set random seed for scikit-learn randomized SVD") - - -##### EMU ##### -def main(): - args = parser.parse_args() - if len(sys.argv) < 2: - parser.print_help() - sys.exit() - print("EMU v.0.9\n") - assert args.plink is not None, "No input data (-plink)" - - # Create log-file of arguments - full = vars(parser.parse_args()) - deaf = vars(parser.parse_args([])) - with open(args.out + ".args", "w") as f: - f.write("EMU v.0.9\n") - f.write("Time: " + datetime.now().strftime("%d/%m/%Y %H:%M:%S") + "\n") - f.write("Directory: " + str(os.getcwd()) + "\n") - f.write("Options:\n") - for key in full: - if full[key] != deaf[key]: - if type(full[key]) is bool: - f.write("\t--" + str(key) + "\n") - else: - f.write("\t--" + str(key) + " " + str(full[key]) + "\n") - del full, deaf - - # Control threads - os.environ["OMP_NUM_THREADS"] = str(args.threads) - os.environ["OPENBLAS_NUM_THREADS"] = str(args.threads) - os.environ["MKL_NUM_THREADS"] = str(args.threads) - - # Import numerical libraries - import numpy as np - from math import ceil - - # Import own scripts - from emu import shared - from emu import shared_cy - - # Set K - assert args.n_eig is not None, "Must specify number of eigenvectors to use!" - if args.n_out is None: - K = args.n_eig - else: - K = args.n_out - - # Read data - print("Reading in data matrix from PLINK files.") - # Finding length of .fam and .bim file and read .bed file into NumPy array - n = extract_length(args.plink + ".fam") - m = extract_length(args.plink + ".bim") - with open(args.plink + ".bed", "rb") as bed: - B = np.fromfile(bed, dtype=np.uint8, offset=3) - Bi = ceil(n/4) # Length of bytes to describe n individuals - D = B.reshape((m, Bi)) - m_old = D.shape[0] # For future reference - print("Loaded " + str(n) + " samples and " + str(m) + " sites.") - - # Population allele frequencies - print("Estimating population allele frequencies.") - f = np.zeros(m, dtype=np.float32) - c = np.zeros(m, dtype=np.int32) - shared_cy.estimateF(D, f, c, Bi, n, m, args.threads) - del c - - # Removing rare variants - if args.maf > 0.0: - mask = (f >= args.maf) & (f <= (1 - args.maf)) - - # Filter and update arrays without copying - m = np.sum(mask) - tmpMask = mask.astype(np.uint8) - shared_cy.filterArrays(D, f, tmpMask) - D = D[:m,:] - f = f[:m] - del tmpMask - print("Number of sites after MAF filtering (" + str(args.maf) + "): " \ - + str(m)) - m = D.shape[0] - assert (not np.allclose(np.max(f), 1.0)) or (not np.allclose(np.min(f), 0.0)), \ - "Fixed sites in dataset. Must perform MAF filtering (-f / --maf)!" - - # Additional parsing options - if args.cost_step: - assert args.cost, "Must also estimate cost at every iteration (-cost)!" - if args.no_accel: - print("Turned off EM acceleration.") - accel = False - else: - accel = True - - ##### EMU ##### - if args.mem: - print("\nPerforming EMU-mem using " + str(args.n_eig) + " eigenvector(s).") - U, s, V = shared.emuMemory(D, f, args.n_eig, K, args.iter, args.tole, \ - Bi, n, m, args.svd_power, args.out, accel, \ - args.seed, args.threads) - else: - print("\nPerforming EMU using " + str(args.n_eig) + " eigenvector(s).") - U, s, V = shared.emuAlgorithm(D, f, args.n_eig, K, args.iter, args.tole, \ - Bi, n, m, args.svd, args.svd_power, \ - args.out, accel, args.cost, \ - args.cost_step, args.seed, args.threads) - - # Save matrices - np.savetxt(args.out + ".eigenvecs", V.T, fmt="%.7f") - print("Saved eigenvector(s) as " + args.out + ".eigenvecs (Text).") - np.savetxt(args.out + ".eigenvals", s**2/float(m), fmt="%.7f") - print("Saved eigenvalue(s) as " + args.out + ".eigenvals (Text).") - del V, s - - # Save loadings - if args.loadings: - np.save(args.out + ".loadings", U) - print("Saved SNP loadings as " + args.out + ".loadings.npy (Binary).") - - # Perform genome-wide selection scan - if args.selection: - print("\nPerforming genome-wide selection scan along each PC.") - Dsquared = np.zeros((m, K), dtype=np.float32) - shared_cy.galinskyScan(U, Dsquared) - np.save(args.out + ".selection", Dsquared.astype(float, copy=False)) - print("Saved test statistics as " + args.out + ".selection.npy (Binary).") - del Dsquared - del U - - # Optional saves - if args.maf_save: - np.save(args.out + ".maf", f.astype(float, copy=False)) - print("\nSaved minor allele frequencies as " + args.out + ".maf.npy (Binary).") - del f - if (args.sites_save) and (args.maf > 0.0): - siteVec = np.zeros(m_old, dtype=np.uint8) - siteVec[mask] = 1 - np.savetxt(args.out + ".sites", siteVec, fmt="%i") - print("\nSaved boolean vector of sites kept after filtering as " + \ - str(args.out) + ".sites (Text)") - - -##### Define main ##### -if __name__ == "__main__": - main() diff --git a/emu/halko.pyx b/emu/halko.pyx deleted file mode 100644 index c9adf06..0000000 --- a/emu/halko.pyx +++ /dev/null @@ -1,341 +0,0 @@ -# cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True -import numpy as np -cimport numpy as np -from cython.parallel import prange -from libc.math cimport sqrt - -##### EMU-mem ##### -### 2-bit functions only ### -### Frequency functions -# Matrix Multiplication from byte matrix - dot(E, X) -cpdef matMul_Freq(unsigned char[:,::1] D, float[::1] f, float[:,::1] X, \ - float[:,::1] Y, int Bi, int n, int m, int t): - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - cdef float e - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = 0.5 - f[j] - else: - e = code - f[j] - for k in range(K): - Y[j,k] = Y[j,k] + e*X[i,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Trans Matrix Multiplication from byte matrix - dot(E.T, Y) -cpdef matMulTrans_Freq(unsigned char[:,::1] D, float[::1] f, float[:,:] Y, \ - float[:,::1] X, int Bi, int n, int m, int t): - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - cdef float e - with nogil: - for b in prange(Bi, num_threads=t): - for j in range(m): - byte = D[j,b] - i = b*4 - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = 0.5 - f[j] - else: - e = code - f[j] - for k in range(K): - X[i,k] = X[i,k] + e*Y[j,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -## Final iteration -# Matrix Multiplication from byte matrix - dot(E, X) -cpdef matMulFinal_Freq(unsigned char[:,::1] D, float[::1] f, float[:,:] X, \ - float[:,:] Y, int Bi, int n, int m, int t): - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - cdef float e - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = (0.5 - f[j])/sqrt(f[j]*(1 - f[j])) - else: - e = (code - f[j])/sqrt(f[j]*(1 - f[j])) - for k in range(K): - Y[j,k] = Y[j,k] + e*X[i,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Trans Matrix Multiplication from byte matrix - dot(E.T, Y) -cpdef matMulTransFinal_Freq(unsigned char[:,::1] D, float[::1] f, float[:,:] Y,\ - float[:,::1] X, int Bi, int n, int m, int t): - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - cdef float e - with nogil: - for b in prange(Bi, num_threads=t): - for j in range(m): - byte = D[j,b] - i = b*4 - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = (0.5 - f[j])/sqrt(f[j]*(1 - f[j])) - else: - e = (code - f[j])/sqrt(f[j]*(1 - f[j])) - for k in range(K): - X[i,k] = X[i,k] + e*Y[j,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -### SVD update functions -## Domain mapping -# Matrix Multiplication from byte matrix - dot(E, X) -cpdef matMul_SVD_domain(unsigned char[:,::1] D, float[::1] f, float[:,:] U, \ - float[:] s, float[:,:] W, float[:,::1] X, \ - float[:,::1] Y, int Bi, int n, int m, int t): - cdef int V = U.shape[1] - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, v, b, bytepart - cdef float e - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = 0.5 - f[j] - else: - e = code - f[j] - else: - e = 0.0 - for v in range(V): - e = e + U[j,v]*s[v]*W[v,i] - e = min(max(e + f[j], 1e-4), 1-(1e-4)) - e = e - f[j] - for k in range(K): - Y[j,k] = Y[j,k] + e*X[i,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Trans Matrix Multiplication from byte matrix - dot(E.T, Y) -cpdef matMulTrans_SVD_domain(unsigned char[:,::1] D, float[::1] f, \ - float[:,:] U, float[:] s, float[:,:] W, \ - float[:,:] Y, float[:,::1] X, int Bi, int n, \ - int m, int t): - cdef int V = U.shape[1] - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, v, b, bytepart - cdef float e - with nogil: - for b in prange(Bi, num_threads=t): - for j in range(m): - byte = D[j,b] - i = b*4 - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = 0.5 - f[j] - else: - e = code - f[j] - else: - e = 0.0 - for v in range(V): - e = e + U[j,v]*s[v]*W[v,i] - e = min(max(e + f[j], 1e-4), 1-(1e-4)) - e = e - f[j] - for k in range(K): - X[i,k] = X[i,k] + e*Y[j,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -## Final iteration -# Matrix Multiplication from byte matrix - dot(E, X) -cpdef matMulFinal_SVD(unsigned char[:,::1] D, float[::1] f, float[:,:] U, \ - float[:] s, float[:,:] W, float[:,:] X, float[:,:] Y, \ - int Bi, int n, int m, int t): - cdef int V = U.shape[1] - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, v, b, bytepart - cdef float e - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = (0.5 - f[j])/sqrt(f[j]*(1 - f[j])) - else: - e = (code - f[j])/sqrt(f[j]*(1 - f[j])) - else: - e = 0.0 - for v in range(V): - e = e + U[j,v]*s[v]*W[v,i] - e = e/sqrt(f[j]*(1 - f[j])) - for k in range(K): - Y[j,k] = Y[j,k] + e*X[i,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Trans Matrix Multiplication from byte matrix - dot(E.T, Y) -cpdef matMulTransFinal_SVD(unsigned char[:,::1] D, float[::1] f, float[:,:] U, \ - float[:] s, float[:,:] W, float[:,:] Y, \ - float[:,::1] X, int Bi, int n, int m, int t): - cdef int V = U.shape[1] - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, v, b, bytepart - cdef float e - with nogil: - for b in prange(Bi, num_threads=t): - for j in range(m): - byte = D[j,b] - i = b*4 - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = (0.5 - f[j])/sqrt(f[j]*(1 - f[j])) - else: - e = (code - f[j])/sqrt(f[j]*(1 - f[j])) - else: - e = 0.0 - for v in range(V): - e = e + U[j,v]*s[v]*W[v,i] - e = e/sqrt(f[j]*(1 - f[j])) - for k in range(K): - X[i,k] = X[i,k] + e*Y[j,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -### Acceleration functions -## Map2Domain -# Matrix Multiplication from byte matrix - dot(E, X) -cpdef matMul_SVD_domain_accel(unsigned char[:,::1] D, float[::1] f, \ - float[:,:] U, float[:,:] W, float[:,::1] X, \ - float[:,::1] Y, int Bi, int n, int m, int t): - cdef int V = U.shape[1] - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, v, b, bytepart - cdef float e - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = 0.5 - f[j] - else: - e = code - f[j] - else: - e = 0.0 - for v in range(V): - e = e + U[j,v]*W[v,i] - e = min(max(e + f[j], 1e-4), 1-(1e-4)) - e = e - f[j] - for k in range(K): - Y[j,k] = Y[j,k] + e*X[i,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Trans Matrix Multiplication from byte matrix - dot(E.T, Y) -cpdef matMulTrans_SVD_domain_accel(unsigned char[:,::1] D, float[::1] f, \ - float[:,:] U, float[:,:] W, float[:,:] Y, \ - float[:,::1] X, int Bi, int n, int m, int t): - cdef int V = U.shape[1] - cdef int K = X.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, v, b, bytepart - cdef float e - with nogil: - for b in prange(Bi, num_threads=t): - for j in range(m): - byte = D[j,b] - i = b*4 - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - e = 0.5 - f[j] - else: - e = code - f[j] - else: - e = 0.0 - for v in range(V): - e = e + U[j,v]*W[v,i] - e = min(max(e + f[j], 1e-4), 1-(1e-4)) - e = e - f[j] - for k in range(K): - X[i,k] = X[i,k] + e*Y[j,k] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break diff --git a/emu/shared.py b/emu/shared.py deleted file mode 100644 index 65de7bd..0000000 --- a/emu/shared.py +++ /dev/null @@ -1,383 +0,0 @@ -""" -EMU. -Iterative SVD algorithms. - -Jonas Meisner, Siyang Liu, Mingxi Huang and Anders Albrechtsen -""" - -__author__ = "Jonas Meisner" - -# Libraries -import numpy as np -from scipy.sparse.linalg import svds - -# Import own scripts -from emu import halko -from emu import shared_cy - -##### EMU ##### -# Flip signs of SVD output - Based on scikit-learn (svd_flip) -def signFlip(U, Vt): - maxCols = np.argmax(np.abs(U), axis=0) - signs = np.sign(U[maxCols, range(U.shape[1])]) - U *= signs - Vt *= signs[:, np.newaxis] - return U, Vt - -### PCAone Halko ### -def halkoPCAone(E, k, n_iter, random_state): - m, n = E.shape - Omg = np.random.standard_normal(size=(n, k+10)).astype(np.float32) - for p in range(n_iter): - if p > 0: - Omg, _ = np.linalg.qr(H, mode="reduced") - G = np.dot(E, Omg) - H = np.dot(E.T, G) - Q, R = np.linalg.qr(G, mode="reduced") - B = np.linalg.solve(R.T, H.T) - Uhat, s, V = np.linalg.svd(B, full_matrices=False) - del B - U = np.dot(Q, Uhat) - return U[:,:k], s[:k], V[:k,:] - - -### Main EMU function ### -def emuAlgorithm(D, f, e, K, M, M_tole, Bi, n, m, svd_method, svd_power, \ - output, accel, cost, cost_step, seed, t): - E = np.zeros((m, n), dtype=np.float32) - - # Set random seed - np.random.seed(seed) - - # Setup acceleration - if accel: - print("Using accelerated EM scheme (SqS3)") - diffU_1 = np.zeros((m, e), dtype=np.float32) - diffU_2 = np.zeros((m, e), dtype=np.float32) - diffU_3 = np.zeros((m, e), dtype=np.float32) - diffW_1 = np.zeros((e, n), dtype=np.float32) - diffW_2 = np.zeros((e, n), dtype=np.float32) - diffW_3 = np.zeros((e, n), dtype=np.float32) - - # Initiate E matrix - shared_cy.updateE_init(D, f, E, Bi, t) - - # Exit without performing EMU - if M < 1: - print("Warning, no EM-PCA iterations are performed!") - # Estimating SVD - shared_cy.standardizeMatrix(E, f, t) - print("Inferring set of eigenvector(s).") - if svd_method == "arpack": - U, s, V = svds(E, k=K) - U, s, V = U[:, ::-1], s[::-1], V[::-1, :] - U, V = signFlip(U, V) - elif svd_method == "halko": - U, s, V = halkoPCAone(E, K, svd_power, seed) - U, V = signFlip(U, V) - del E - return U, s, V - else: - if accel: - print("Initiating accelerated EM scheme (1).") - # Estimate initial individual allele frequencies - if svd_method == "arpack": - U, s, W = svds(E, k=e) - U, W = signFlip(U, W) - elif svd_method == "halko": - U, s, W = halkoPCAone(E, e, svd_power, seed) - U, W = signFlip(U, W) - - # Estimate cost - if cost: - sumVec = np.zeros(m, dtype=np.float32) - shared_cy.frobenius(D, f, U, s, W, sumVec, Bi, t) - oldCost = np.sum(sumVec) - print("Frobenius: " + str(oldCost)) - - # Update E matrix based on setting - if not accel: - shared_cy.updateE_SVD(D, E, f, U, s, W, Bi, t) - print("Individual allele frequencies estimated (1).") - else: - W = W*s.reshape((e, 1)) - shared_cy.updateE_SVD_accel(D, E, f, U, W, Bi, t) - - # Iterative estimation of individual allele frequencies - for i in range(M): - prevU = np.copy(U) - if accel: - if svd_method == "arpack": - U1, s1, W1 = svds(E, k=e) - U1, W1 = signFlip(U1, W1) - elif svd_method == "halko": - U1, s1, W1 = halkoPCAone(E, e, svd_power, seed) - U1, W1 = signFlip(U1, W1) - W1 = W1*s1.reshape((e, 1)) - shared_cy.matMinus(U1, U, diffU_1) - shared_cy.matMinus(W1, W, diffW_1) - sr2_U = shared_cy.matSumSquare(diffU_1) - sr2_W = shared_cy.matSumSquare(diffW_1) - shared_cy.updateE_SVD_accel(D, E, f, U1, W1, Bi, t) - if svd_method == "arpack": - U2, s2, W2 = svds(E, k=e) - U2, W2 = signFlip(U2, W2) - elif svd_method == "halko": - U2, s2, W2 = halkoPCAone(E, e, svd_power, seed) - U2, W2 = signFlip(U2, W2) - W2 = W2*s2.reshape((e, 1)) - shared_cy.matMinus(U2, U1, diffU_2) - shared_cy.matMinus(W2, W1, diffW_2) - - # SQUAREM update of W and U SqS3 - shared_cy.matMinus(diffU_2, diffU_1, diffU_3) - shared_cy.matMinus(diffW_2, diffW_1, diffW_3) - sv2_U = shared_cy.matSumSquare(diffU_3) - sv2_W = shared_cy.matSumSquare(diffW_3) - if (i == 0) and (np.isclose(sv2_U, 0.0)): - print("No missingness in data. Skipping iterative approach!") - break - alpha_U = max(1.0, np.sqrt(sr2_U/sv2_U)) - alpha_W = max(1.0, np.sqrt(sr2_W/sv2_W)) - - # New accelerated update - shared_cy.matUpdate(U, diffU_1, diffU_3, alpha_U) - shared_cy.matUpdate(W, diffW_1, diffW_3, alpha_W) - shared_cy.updateE_SVD_accel(D, E, f, U, W, Bi, t) - if cost: - shared_cy.frobenius_accel(D, f, U, W, sumVec, Bi, t) - newCost = np.sum(sumVec) - print("Frobenius: " + str(newCost)) - if oldCost >= newCost: - print("Bad step, using un-accelerated update!") - shared_cy.updateE_SVD_accel(D, E, f, U2, W2, Bi, t) - else: - oldCost = newCost - else: - if svd_method == "arpack": - U, s, W = svds(E, k=e) - U, W = signFlip(U, W) - elif svd_method == "halko": - U, s, W = halkoPCAone(E, e, svd_power, seed) - U, W = signFlip(U, W) - shared_cy.updateE_SVD(D, E, f, U, s, W, Bi, t) - if cost: - shared_cy.frobenius(D, f, U, s, W, sumVec, Bi, t) - print("Frobenius: " + str(np.sum(sumVec))) - - # Break iterative update if converged - diff = np.sqrt(np.sum(shared_cy.rmse(U, prevU))/(m*e)) - print("Individual allele frequencies estimated (" + str(i+2) + "). RMSE=" + str(diff)) - if diff < M_tole: - print("Estimation of individual allele frequencies has converged.") - break - del prevU - if cost: - del sumVec - - # Run non-accelerated update to ensure properties of W, s, U - if accel: - if svd_method == "arpack": - U, s, W = svds(E, k=e) - U, W = signFlip(U, W) - elif svd_method == "halko": - U, s, W = halkoPCAone(E, e, svd_power, seed) - U, W = signFlip(U, W) - shared_cy.updateE_SVD(D, E, f, U, s, W, Bi, t) - del U1, U2, W1, W2, s1, s2, diffU_1, diffU_2, diffU_3, diffW_1, diffW_2, diffW_3 - - # Estimating SVD - shared_cy.standardizeMatrix(E, f, t) - print("Inferring set of eigenvector(s).") - if svd_method == "arpack": - U, s, V = svds(E, k=K) - U, s, V = U[:, ::-1], s[::-1], V[::-1, :] - elif svd_method == "halko": - U, s, V = halkoPCAone(E, K, svd_power, seed) - U, W = signFlip(U, W) - del E - return U, s, V - -##### EMU-mem ###### -### 2-bit PCAone Halko -def customSVD(D, f, k, Bi, n, m, n_iter, t): - K = k + 10 - G = np.zeros((m, K), dtype=np.float32) - H = np.zeros((n, K), dtype=np.float32) - - # Sample Gaussian vectors - Omg = np.random.standard_normal(size=(n, K)).astype(np.float32) - - # Power iterations - for p in range(n_iter): - if p > 0: - Omg, _ = np.linalg.qr(H, mode="reduced") - G.fill(0) - halko.matMul_Freq(D, f, Omg, G, Bi, n, m, t) - H.fill(0) - halko.matMulTrans_Freq(D, f, G, H, Bi, n, m, t) - Q, R = np.linalg.qr(G, mode="reduced") - B = np.linalg.solve(R.T, H.T) - Uhat, s, V = np.linalg.svd(B, full_matrices=False) - del B - U = np.dot(Q, Uhat) - U, V = signFlip(U, V) - return U[:,:k], s[:k], V[:k,:] - -# CustomSVD when mapping back to domain for E=WSU.T -def customDomainSVD(D, f, k, U, s, W, Bi, n, m, n_iter, t): - K = k + 10 - G = np.zeros((m, K), dtype=np.float32) - H = np.zeros((n, K), dtype=np.float32) - - # Sample Gaussian vectors - Omg = np.random.standard_normal(size=(n, K)).astype(np.float32) - - # Power iterations - for p in range(n_iter): - if p > 0: - Omg, _ = np.linalg.qr(H, mode="reduced") - G.fill(0) - halko.matMul_SVD_domain(D, f, U, s, W, Omg, G, Bi, n, m, t) - H.fill(0) - halko.matMulTrans_SVD_domain(D, f, U, s, W, G, H, Bi, n, m, t) - Q, R = np.linalg.qr(G, mode="reduced") - B = np.linalg.solve(R.T, H.T) - Uhat, s, V = np.linalg.svd(B, full_matrices=False) - del B - U = np.dot(Q, Uhat) - U, V = signFlip(U, V) - return U[:,:k], s[:k], V[:k,:] - -# CustomSVD for final iteration -def customFinalSVD(D, f, k, U, s, W, Bi, n, m, n_iter, t): - K = k + 10 - G = np.zeros((m, K), dtype=np.float32) - H = np.zeros((n, K), dtype=np.float32) - - # Sample Gaussian vectors - Omg = np.random.standard_normal(size=(n, K)).astype(np.float32) - - # Power iterations - for p in range(n_iter): - if p > 0: - Omg, _ = np.linalg.qr(H, mode="reduced") - G.fill(0) - if W is None: - halko.matMulFinal_Freq(D, f, Omg, G, Bi, n, m, t) - H.fill(0) - halko.matMulTransFinal_Freq(D, f, G, H, Bi, n, m, t) - else: - halko.matMulFinal_SVD(D, f, U, s, W, Omg, G, Bi, n, m, t) - H.fill(0) - halko.matMulTransFinal_SVD(D, f, U, s, W, G, H, Bi, n, m, t) - Q, R = np.linalg.qr(G, mode="reduced") - B = np.linalg.solve(R.T, H.T) - Uhat, s, V = np.linalg.svd(B, full_matrices=False) - del B - U = np.dot(Q, Uhat) - U, V = signFlip(U, V) - return U[:,:k], s[:k], V[:k,:] - -# CustomSVD for acceleration in domain -def customDomainSVD_accel(D, f, k, U, W, Bi, n, m, n_iter, t): - K = k + 10 - G = np.zeros((m, K), dtype=np.float32) - H = np.zeros((n, K), dtype=np.float32) - - # Sample Gaussian vectors - Omg = np.random.standard_normal(size=(n, K)).astype(np.float32) - - # Power iterations - for p in range(n_iter): - if p > 0: - Omg, _ = np.linalg.qr(H, mode="reduced") - G.fill(0) - halko.matMul_SVD_domain_accel(D, f, U, W, Omg, G, Bi, n, m, t) - H.fill(0) - halko.matMulTrans_SVD_domain_accel(D, f, U, W, G, H, Bi, n, m, t) - Q, R = np.linalg.qr(G, mode="reduced") - B = np.linalg.solve(R.T, H.T) - Uhat, s, V = np.linalg.svd(B, full_matrices=False) - del B - U = np.dot(Q, Uhat) - U, V = signFlip(U, V) - return U[:,:k], s[:k], V[:k,:] - - -### Main EMU-mem function ### -def emuMemory(D, f, e, K, M, M_tole, Bi, n, m, svd_power, \ - output, accel, seed, t): - # Set random seed - np.random.seed(seed) - - # Setup acceleration - if accel: - print("Using accelerated EM scheme (SqS3).") - diffU_1 = np.zeros((m, e), dtype=np.float32) - diffU_2 = np.zeros((m, e), dtype=np.float32) - diffU_3 = np.zeros((m, e), dtype=np.float32) - diffW_1 = np.zeros((e, n), dtype=np.float32) - diffW_2 = np.zeros((e, n), dtype=np.float32) - diffW_3 = np.zeros((e, n), dtype=np.float32) - if M < 1: - print("Warning, no EM-PCA iterations are performed!") - print("Inferring set of eigenvector(s).") - U, s, V = customFinalSVD(D, f, K, None, None, None, Bi, n, m, svd_power, t) - return U, s, V - else: - # Estimate initial individual allele frequencies - if accel: - print("Initiating accelerated EM scheme (1)") - U, s, W = customSVD(D, f, e, Bi, n, m, svd_power, t) - if not accel: - print("Individual allele frequencies estimated (1).") - else: - W = W*s.reshape((e, 1)) - - # Iterative estimation of individual allele frequencies - for i in range(M): - prevU = np.copy(U) - if accel: - U1, s1, W1 = customDomainSVD_accel(D, f, e, U, W, Bi, n, m, svd_power, t) - W1 = W1*s1.reshape((e, 1)) - shared_cy.matMinus(U1, U, diffU_1) - shared_cy.matMinus(W1, W, diffW_1) - sr2_U = shared_cy.matSumSquare(diffU_1) - sr2_W = shared_cy.matSumSquare(diffW_1) - U2, s2, W2 = customDomainSVD_accel(D, f, e, U1, W1, Bi, n, m, svd_power, t) - W2 = W2*s2.reshape((e, 1)) - shared_cy.matMinus(U2, U1, diffU_2) - shared_cy.matMinus(W2, W1, diffW_2) - - # SQUAREM update of W and U SqS3 - shared_cy.matMinus(diffU_2, diffU_1, diffU_3) - shared_cy.matMinus(diffW_2, diffW_1, diffW_3) - sv2_U = shared_cy.matSumSquare(diffU_3) - sv2_W = shared_cy.matSumSquare(diffW_3) - alpha_U = max(1.0, np.sqrt(sr2_U/sv2_U)) - alpha_W = max(1.0, np.sqrt(sr2_W/sv2_W)) - - # New accelerated update - shared_cy.matUpdate(U, diffU_1, diffU_3, alpha_U) - shared_cy.matUpdate(W, diffW_1, diffW_3, alpha_W) - else: - U, s, W = customDomainSVD(D, f, e, U, s, W, Bi, n, m, svd_power, t) - - # Break iterative update if converged - diff = np.sqrt(np.sum(shared_cy.rmse(U, prevU))/(m*e)) - print("Individual allele frequencies estimated (" + str(i+2) + "). RMSE=" + str(diff)) - if diff < M_tole: - print("Estimation of individual allele frequencies has converged.") - break - del prevU - - # Run non-accelerated update to ensure properties of U, s, W - if accel: - U, s, W = customDomainSVD_accel(D, f, e, U, W, Bi, n, m, svd_power, t) - del U1, U2, s1, s2, W1, W2, diffU_1, diffU_2, diffU_3, diffW_1, diffW_2, diffW_3 - - # Estimating SVD - print("Inferring set of eigenvector(s).") - U, s, V = customFinalSVD(D, f, K, U, s, W, Bi, n, m, svd_power, t) - return U, s, V diff --git a/emu/shared_cy.pyx b/emu/shared_cy.pyx deleted file mode 100644 index a3a2827..0000000 --- a/emu/shared_cy.pyx +++ /dev/null @@ -1,258 +0,0 @@ -# cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True -import numpy as np -cimport numpy as np -from cython.parallel import prange -from libc.math cimport sqrt - -##### EMU ##### -# Estimate population allele frequencies -cpdef estimateF(unsigned char[:,::1] D, float[::1] f, int[::1] c, int Bi, \ - int n, int m, int t): - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, b, bytepart - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - c[j] = c[j] + 1 - if code == 2: - f[j] = f[j] + 0.5 - else: - f[j] = f[j] + code - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - if c[j] > 0: - f[j] = f[j]/float(c[j]) - else: - f[j] = 0.0 - break - -# Array filtering -cpdef filterArrays(unsigned char[:,::1] D, float[::1] f, \ - unsigned char[::1] mask): - cdef int m = D.shape[0] - cdef int n = D.shape[1] - cdef int c = 0 - cdef int i, j - for j in range(m): - if mask[j] == 1: - for i in range(n): - D[c,i] = D[j,i] # Data matrix - f[c] = f[j] # Allele frequency - c += 1 - -# Initial update of dosage matrix -cpdef updateE_init(unsigned char[:,::1] D, float[::1] f, float[:,::1] E, \ - int Bi, int t): - cdef int m = E.shape[0] - cdef int n = E.shape[1] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, b, bytepart - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - if code == 2: - E[j,i] = 0.5 - f[j] - else: - E[j,i] = code - f[j] - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Update E directly from SVD -cpdef updateE_SVD(unsigned char[:,::1] D, float[:,::1] E, float[::1] f, \ - float[:,:] U, float[:] s, float[:,:] W, int Bi, int t): - cdef int m = E.shape[0] - cdef int n = E.shape[1] - cdef int K = s.shape[0] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code == 9: - E[j,i] = 0.0 - for k in range(K): - E[j,i] += U[j,k]*s[k]*W[k,i] - E[j,i] = min(max(E[j,i], -f[j]), 1-f[j]) - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Accelerated update of E directly from SVD -cpdef updateE_SVD_accel(unsigned char[:,::1] D, float[:,::1] E, float[::1] f, \ - float[:,:] U, float[:,:] SW, int Bi, int t): - cdef int m = E.shape[0] - cdef int n = E.shape[1] - cdef int K = SW.shape[0] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - with nogil: - for j in prange(m, num_threads=t): - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code == 9: - E[j,i] = 0.0 - for k in range(K): - E[j,i] += U[j,k]*SW[k,i] - E[j,i] = min(max(E[j,i], -f[j]), 1-f[j]) - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Standardize dosage matrix -cpdef standardizeMatrix(float[:,::1] E, float[::1] f, int t): - cdef int m = E.shape[0] - cdef int n = E.shape[1] - cdef int i, j - with nogil: - for j in prange(m, num_threads=t): - for i in range(n): - E[j,i] /= sqrt(f[j]*(1-f[j])) - -# Root-mean squared error -cpdef rmse(float[:,:] A, float[:,:] B): - cdef int n = A.shape[0] - cdef int m = A.shape[1] - cdef int i, j - cdef float res = 0.0 - for i in range(n): - for j in range(m): - res += (A[i,j] - B[i,j])*(A[i,j] - B[i,j]) - return res - -# Selection scan -cpdef galinskyScan(float[:,:] U, float[:,::1] Dsquared): - cdef int m = U.shape[0] - cdef int K = U.shape[1] - cdef int j, k - for j in range(m): - for k in range(K): - Dsquared[j,k] = (U[j,k]**2)*float(m) - -### Accelerated EM -# Matrix subtraction -cpdef matMinus(float[:,:] M1, float[:,:] M2, float[:,::1] diffM): - cdef int n = M1.shape[0] - cdef int m = M1.shape[1] - cdef int i, j - for i in range(n): - for j in range(m): - diffM[i,j] = M1[i,j]-M2[i,j] - -# Matrix sum of squares -cpdef matSumSquare(float[:,:] M): - cdef int n = M.shape[0] - cdef int m = M.shape[1] - cdef int i, j - cdef float res = 0.0 - for i in range(n): - for j in range(m): - res += M[i,j]*M[i,j] - return res - -# Update factor matrices -cpdef matUpdate(float[:,:] M, float[:,::1] diffM_1, float[:,::1] diffM_3, \ - float alpha): - cdef int n = M.shape[0] - cdef int m = M.shape[1] - cdef int i, j - for i in range(n): - for j in range(m): - M[i,j] = M[i,j] + 2*alpha*diffM_1[i,j] + alpha*alpha*diffM_3[i,j] - -### Likelihood measures for debugging -cpdef frobenius(unsigned char[:,::1] D, float[::1] f, float[:,:] U, float[:] s,\ - float[:,:] W, float[::1] sumVec, int Bi, int t): - cdef int m = U.shape[0] - cdef int n = W.shape[1] - cdef int K = s.shape[0] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - cdef float e - with nogil: - for j in prange(m, num_threads=t): - sumVec[j] = 0.0 - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - e = 0.0 - for k in range(K): - e = e + U[j,k]*s[k]*W[k,i] - e = e + f[j] - e = min(max(e, 0), 1) - if code == 2: - sumVec[j] = sumVec[j] + (0.5 - e)**2 - else: - sumVec[j] = sumVec[j] + (code - e)**2 - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break - -# Accelerated -cpdef frobenius_accel(unsigned char[:,::1] D, float[::1] f, float[:,:] U, \ - float[:,:] SW, float[::1] sumVec, int Bi, int t): - cdef int m = U.shape[0] - cdef int n = SW.shape[1] - cdef int K = SW.shape[0] - cdef signed char[4] recode = [1, 9, 2, 0] # EMU format - cdef unsigned char mask = 3 - cdef unsigned char byte, code - cdef int i, j, k, b, bytepart - cdef float e - with nogil: - for j in prange(m, num_threads=t): - sumVec[j] = 0.0 - i = 0 - for b in range(Bi): - byte = D[j,b] - for bytepart in range(4): - code = recode[byte & mask] - if code != 9: - e = 0.0 - for k in range(K): - e = e + U[j,k]*SW[k,i] - e = e + f[j] - e = min(max(e, 0), 1) - if code == 2: - sumVec[j] = sumVec[j] + (0.5 - e)**2 - else: - sumVec[j] = sumVec[j] + (code - e)**2 - byte = byte >> 2 # Right shift 2 bits - i = i + 1 - if i == n: - break diff --git a/environment.yml b/environment.yml index f449b7d..32f3f2c 100644 --- a/environment.yml +++ b/environment.yml @@ -2,7 +2,6 @@ name: emu channels: - defaults dependencies: - - python>=3.6 + - python - numpy - - scipy - cython diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..33c8b4a --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["setuptools", "cython", "numpy", "scipy"] +build-backend = "setuptools.build_meta" diff --git a/requirements.txt b/requirements.txt index 6dbfa8d..8dbecfc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ numpy -scipy cython diff --git a/setup.py b/setup.py index f7bc0dc..673e6ea 100644 --- a/setup.py +++ b/setup.py @@ -2,31 +2,38 @@ from Cython.Build import cythonize import numpy -extensions = [Extension( - "emu.shared_cy", - ["emu/shared_cy.pyx"], - extra_compile_args=['-fopenmp', '-g0'], - extra_link_args=['-fopenmp'], - include_dirs=[numpy.get_include()], - ), - Extension( - "emu.halko", - ["emu/halko.pyx"], - extra_compile_args=['-fopenmp', '-g0'], - extra_link_args=['-fopenmp'], - include_dirs=[numpy.get_include()], - )] +extensions = [ + Extension( + "src.shared_cy", + ["src/shared_cy.pyx"], + extra_compile_args=['-fopenmp', '-O3', '-g0', '-Wno-unreachable-code'], + extra_link_args=['-fopenmp'], + include_dirs=[numpy.get_include()], + define_macros=[('NPY_NO_DEPRECATED_API', 'NPY_1_7_API_VERSION')] + ), Extension( + "src.memory_cy", + ["src/memory_cy.pyx"], + extra_compile_args=['-fopenmp', '-O3', '-g0', '-Wno-unreachable-code'], + extra_link_args=['-fopenmp'], + include_dirs=[numpy.get_include()], + define_macros=[('NPY_NO_DEPRECATED_API', 'NPY_1_7_API_VERSION')] + ) +] setup( - name="EMU", - version="0.9", + name="emu", + version="1.0", description="EM-PCA for performing PCA in the presence of missingness", author="Jonas Meisner", - packages=["emu"], + packages=["src"], entry_points={ - "console_scripts": ["emu=emu.emu:main"] + "console_scripts": ["emu=src.main:main"] }, python_requires=">=3.6", + install_requires=[ + "cython", + "numpy" + ], ext_modules=cythonize(extensions), include_dirs=[numpy.get_include()] ) diff --git a/emu/__init__.py b/src/__init__.py similarity index 100% rename from emu/__init__.py rename to src/__init__.py diff --git a/src/algorithm.py b/src/algorithm.py new file mode 100644 index 0000000..2019153 --- /dev/null +++ b/src/algorithm.py @@ -0,0 +1,129 @@ +""" +EMU. +Iterative SVD algorithms. + +Jonas Meisner, Siyang Liu, Mingxi Huang and Anders Albrechtsen +""" + +__author__ = "Jonas Meisner" + +# Libraries +import numpy as np +from math import sqrt +from src import shared +from src import shared_cy + +##### EMU ##### +def emuAlgorithm(D, f, e, K, N, e_iter, e_tole, power, cost, seed, threads): + M, B = D.shape + E = np.zeros((M, N), dtype=np.float32) + + # Setup acceleration + print("Using accelerated EM scheme (SqS3).") + DU1 = np.zeros((M, e), dtype=np.float32) + DU2 = np.zeros((M, e), dtype=np.float32) + DU3 = np.zeros((M, e), dtype=np.float32) + DV1 = np.zeros((N, e), dtype=np.float32) + DV2 = np.zeros((N, e), dtype=np.float32) + DV3 = np.zeros((N, e), dtype=np.float32) + + # Initiate E matrix + shared_cy.updateInit(D, f, E, threads) + + # Exit without performing EMU + if e_iter < 1: + print("Warning, no EM-PCA iterations are performed!") + print("Inferring eigenvector(s).") + shared_cy.standardizeMatrix(E, f, threads) + U, S, V = shared.halko(E, K, power, seed) + U, V = shared.signFlip(U, V) + del E + return U, S, V, 0, False + else: + # Estimate initial individual allele frequencies + print("Initiating accelerated EM scheme (1)") + U, S, V = shared.halko(E, e, power, seed) + U, V = shared.signFlip(U, V) + V = V*S + seed += 1 + + # Estimate cost + if cost: + sumV = np.zeros(M, dtype=np.float32) + shared_cy.frobenius(D, f, U, V, sumV, threads) + print(f"Frobenius: {np.round(np.sum(sumV, dtype=float),1)}") + + # Update E matrix + shared_cy.updateAccel(D, E, f, U, V, threads) + + # Iterative estimation of individual allele frequencies + for i in range(1, e_iter+1): + U0 = np.copy(U) + + # 1st SVD step + U1, S1, V1 = shared.halko(E, e, power, seed) + U1, V1 = shared.signFlip(U1, V1) + V1 = V1*S1 + seed += 1 + shared_cy.matMinus(U1, U, DU1) + shared_cy.matMinus(V1, V, DV1) + sr2_U = shared_cy.matSumSquare(DU1) + sr2_V = shared_cy.matSumSquare(DV1) + shared_cy.updateAccel(D, E, f, U1, V1, threads) + + # 2nd SVD step + U2, S2, V2 = shared.halko(E, e, power, seed) + U2, V2 = shared.signFlip(U2, V2) + V2 = V2*S2 + seed += 1 + shared_cy.matMinus(U2, U1, DU2) + shared_cy.matMinus(V2, V1, DV2) + + # SQUAREM update of U and V SqS3 + shared_cy.matMinus(DU2, DU1, DU3) + shared_cy.matMinus(DV2, DV1, DV3) + sv2_U = shared_cy.matSumSquare(DU3) + sv2_V = shared_cy.matSumSquare(DV3) + if i == 1: + if np.isclose(sv2_U, 0.0): + print("No missingness in data. Skipping iterative approach!") + converged = False + break + aU = max(1.0, sqrt(sr2_U/sv2_U)) + aV = max(1.0, sqrt(sr2_V/sv2_V)) + + # Accelerated update + shared_cy.matUpdate(U, DU1, DU3, aU) + shared_cy.matUpdate(V, DV1, DV3, aV) + shared_cy.updateAccel(D, E, f, U, V, threads) + + # Check optional cost function + if cost: + shared_cy.frobenius(D, f, U, V, sumV, threads) + print(f"Frobenius: {np.round(np.sum(sumV, dtype=float),1)}") + + # Break iterative update if converged + rmseU = shared_cy.rmse(U, U0) + print(f"Iteration {i},\tRMSE={round(rmseU, 9)}") + if rmseU < e_tole: + print("EM-PCA has converged.") + converged = True + break + if i == e_iter: + print("EM-PCA did not converged!") + converged = False + del U0, U1, U2, V1, V2, S1, S2, DU1, DU2, DU3, DV1, DV2, DV3 + + # Stabilization step + U, S, V = shared.halko(E, e, power, seed) + U, V = shared.signFlip(U, V) + seed += 1 + shared_cy.updateSVD(D, E, f, U, S, V, threads) + + # Estimating SVD + shared_cy.standardizeMatrix(E, f, threads) + print(f"Inferring {K} eigenvector(s).") + U, S, V = shared.halko(E, K, power, seed) + U, V = shared.signFlip(U, V) + del E + return U, S, V, i, converged diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..8c55b76 --- /dev/null +++ b/src/main.py @@ -0,0 +1,197 @@ +""" +EMU. +Main caller. Performs iterative SVD of allele count matrix (EM-PCA) based on either ARPACK, or Halko method. + +Jonas Meisner, Siyang Liu, Mingxi Huang and Anders Albrechtsen +""" + +__author__ = "Jonas Meisner" + +# Libraries +import argparse +import os +import sys +from datetime import datetime +from time import time + +# Argparse +parser = argparse.ArgumentParser(prog="emu") +parser.add_argument("--version", action="version", version="%(prog)s 1.0") +parser.add_argument("-b", "--bfile", metavar="FILE-PREFIX", + help="Prefix for PLINK files (.bed, .bim, .fam)") +parser.add_argument("-e", "--n_eig", metavar="INT", type=int, + help="Number of eigenvectors to use in iterative estimation") +parser.add_argument("-k", "--n_out", metavar="INT", type=int, + help="Number of eigenvectors to output in final SVD") +parser.add_argument("-t", "--threads", metavar="INT", type=int, default=1, + help="Number of threads") +parser.add_argument("-f", "--maf", metavar="FLOAT", type=float, + help="Threshold for minor allele frequencies") +parser.add_argument("-s", "--selection", action="store_true", + help="Perform PC-based selection scan (Galinsky et al. 2016)") +parser.add_argument("-o", "--out", metavar="OUTPUT", default="emu", + help="Prefix output name",) +parser.add_argument("-m", "--mem", action="store_true", + help="EMU-mem variant") +parser.add_argument("--iter", metavar="INT", type=int, default=100, + help="Maximum iterations in estimation of individual allele frequencies (100)") +parser.add_argument("--tole", metavar="FLOAT", type=float, default=5e-7, + help="Tolerance in update for individual allele frequencies (5e-7)") +parser.add_argument("--power", metavar="INT", type=int, default=11, + help="Number of power iterations in randomized SVD (11)") +parser.add_argument("--batch", metavar="INT", type=int, default=4096, + help="Number of SNPs to use in batches of memory variant (4096)") +parser.add_argument("--loadings", action="store_true", + help="Save SNP loadings") +parser.add_argument("--maf_save", action="store_true", + help="Save estimated population allele frequencies") +parser.add_argument("--cost", action="store_true", + help="Output min-cost each iteration (DEBUG function)") +parser.add_argument("--seed", metavar="INT", type=int, default=0, + help="Set random seed") + + +##### EMU main caller ##### +def main(): + args = parser.parse_args() + if len(sys.argv) < 2: + parser.print_help() + sys.exit() + print("EMU v1.0\n") + assert args.bfile is not None, "No input data (--bfile)" + assert args.n_eig is not None, "Must specify number of eigenvectors to use!" + start = time() + + # Create log-file of arguments + full = vars(parser.parse_args()) + deaf = vars(parser.parse_args([])) + with open(f"{args.out}.log", "w") as log: + log.write("EMU v1.0\n") + log.write(f"Time: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}\n") + log.write(f"Directory: {os.getcwd()}\n") + log.write("Options:\n") + for key in full: + if full[key] != deaf[key]: + if type(full[key]) is bool: + log.write(f"\t--{key}\n") + else: + log.write(f"\t--{key} {full[key]}\n") + del full, deaf + + # Control threads of external numerical libraries + os.environ["MKL_NUM_THREADS"] = str(args.threads) + os.environ["OMP_NUM_THREADS"] = str(args.threads) + os.environ["NUMEXPR_NUM_THREADS"] = str(args.threads) + os.environ["OPENBLAS_NUM_THREADS"] = str(args.threads) + + # Load numerical libraries + import numpy as np + from math import ceil + from src import algorithm + from src import memory + from src import shared + from src import shared_cy + + # Set K + if args.n_out is None: + K = args.n_eig + else: + K = args.n_out + + # Read data + print("Reading in data matrix from PLINK files.") + N = shared.extract_length(f"{args.bfile}.fam") + M = shared.extract_length(f"{args.bfile}.bim") + with open(f"{args.bfile}.bed", "rb") as bed: + D = np.fromfile(bed, dtype=np.uint8, offset=3) + B = ceil(N/4) # Length of bytes to describe n individuals + D.shape = (M, B) + print(f"Loaded {N} samples and {M} SNPs.", flush=True) + + # Population allele frequencies + print("Estimating population allele frequencies.") + f = np.zeros(M, dtype=np.float32) + shared_cy.estimateF(D, f, N, args.threads) + + # Removing rare variants + if args.maf is not None: + assert (args.maf > 0.0) and (args.maf < 1.0), "Please provide a valid MAF!" + mask = (f >= args.maf) & (f <= (1.0 - args.maf)) + + # Filter and update arrays without copying + M = np.sum(mask) + tmpMask = mask.astype(np.uint8) + shared_cy.filterArrays(D, f, tmpMask, N) + D = D[:M,:] + f = f[:M] + del tmpMask + print(f"Number of sites after MAF filtering ({args.maf}): {M}") + M = D.shape[0] + assert (not np.allclose(np.max(f), 1.0)) or (not np.allclose(np.min(f), 0.0)), \ + "Fixed sites in dataset. Must perform MAF filtering (-f / --maf)!" + + ##### EMU ##### + if args.mem: + print(f"\nPerforming EMU-mem using {args.n_eig} eigenvector(s).") + U, S, V, it, converged = memory.emuMemory(D, f, args.n_eig, K, N, args.iter, \ + args.tole, args.power, args.cost, args.batch, args.seed, args.threads) + else: + print(f"\nPerforming EMU using {args.n_eig} eigenvector(s).") + U, S, V, it, converged = algorithm.emuAlgorithm(D, f, args.n_eig, K, N, \ + args.iter, args.tole, args.power, args.cost, args.seed, args.threads) + del D + + # Print elapsed time for estimation + t_tot = time()-start + t_min = int(t_tot//60) + t_sec = int(t_tot - t_min*60) + print(f"Total elapsed time: {t_min}m{t_sec}s") + + # Save matrices + np.savetxt(f"{args.out}.eigenvecs", V, fmt="%.7f") + print(f"Saved eigenvector(s) as {args.out}.eigenvecs") + np.savetxt(f"{args.out}.eigenvals", (S**2)/float(M), fmt="%.7f") + print(f"Saved eigenvalue(s) as {args.out}.eigenvals") + del V, S + + # Save loadings + if args.loadings: + np.savetxt(f"{args.out}.loadings", U, fmt="%.7f") + print(f"Saved SNP loadings as {args.out}.loadings") + + # Perform genome-wide selection scan + if args.selection: + Dsquared = np.zeros((M, K), dtype=np.float32) + shared_cy.galinskyScan(U, Dsquared) + np.savetxt(f"{args.out}.selection", Dsquared, fmt="%.7f") + print(f"Saved test statistics as {args.out}.selection") + del Dsquared + del U + + # Optional saves + if args.maf_save: + np.savetxt(f"{args.out}.freq", f, fmt="%.7f") + print(f"Saved minor allele frequencies as {args.out}.freq") + del f + + # Write output info to log-file + with open(f"{args.out}.log", "a") as log: + if converged: + log.write(f"\nEM-PCA converged in {it} iterations.\n") + else: + log.write("\nEM-PCA did not converge!\n") + log.write(f"Total elapsed time: {t_min}m{t_sec}s\n") + log.write(f"Saved eigenvector(s) as {args.out}.eigenvecs\n") + log.write(f"Saved eigenvalue(s) as {args.out}.eigenvals\n") + if args.loadings: + log.write(f"Saved SNP loadings as {args.out}.loadings\n") + if args.selection: + log.write(f"Saved test statistics as {args.out}.selection\n") + if args.maf_save: + log.write(f"Saved minor allele frequencies as {args.out}.freq\n") + + + +##### Define main ##### +if __name__ == "__main__": + main() diff --git a/src/memory.py b/src/memory.py new file mode 100644 index 0000000..b98a4ac --- /dev/null +++ b/src/memory.py @@ -0,0 +1,120 @@ +""" +EMU. +Iterative SVD algorithms. + +Jonas Meisner, Siyang Liu, Mingxi Huang and Anders Albrechtsen +""" + +__author__ = "Jonas Meisner" + +# Libraries +import numpy as np +from math import sqrt +from src import shared +from src import shared_cy + +##### EMU-mem ##### +def emuMemory(D, f, e, K, N, e_iter, e_tole, power, cost, batch, seed, threads): + M, B = D.shape + + # Setup acceleration + print("Using accelerated EM scheme (SqS3).") + DU1 = np.zeros((M, e), dtype=np.float32) + DU2 = np.zeros((M, e), dtype=np.float32) + DU3 = np.zeros((M, e), dtype=np.float32) + DV1 = np.zeros((N, e), dtype=np.float32) + DV2 = np.zeros((N, e), dtype=np.float32) + DV3 = np.zeros((N, e), dtype=np.float32) + + # Exit without performing EMU + if e_iter < 1: + print("Warning, no EM-PCA iterations are performed!") + print("Inferring set of eigenvector(s).") + U, S, V = shared.halkoBatch(D, f, K, N, None, None, None, power, batch, \ + seed, True, threads) + return U, S, V, 0, False + else: + # Estimate initial individual allele frequencies + print("Initiating accelerated EM scheme (1)") + U, S, V = shared.halkoBatch(D, f, e, N, None, None, None, power, batch, \ + seed, False, threads) + U, V = shared.signFlip(U, V) + V = V*S + seed += 1 + + # Estimate cost + if cost: + sumV = np.zeros(M, dtype=np.float32) + shared_cy.frobenius(D, f, U, V, sumV, threads) + print(f"Frobenius: {np.round(np.sum(sumV, dtype=float),1)}") + + # Iterative estimation of individual allele frequencies + for i in range(1, e_iter+1): + U0 = np.copy(U) + + # 1st SVD step + U1, S1, V1 = shared.halkoBatch(D, f, e, N, U, None, V, power, batch, \ + seed, False, threads) + U1, V1 = shared.signFlip(U1, V1) + V1 = V1*S1 + seed += 1 + shared_cy.matMinus(U1, U, DU1) + shared_cy.matMinus(V1, V, DV1) + sr2_U = shared_cy.matSumSquare(DU1) + sr2_V = shared_cy.matSumSquare(DV1) + + # 2nd SVD step + U2, S2, V2 = shared.halkoBatch(D, f, e, N, U1, None, V1, power, batch, \ + seed, False, threads) + U2, V2 = shared.signFlip(U2, V2) + V2 = V2*S2 + seed += 1 + shared_cy.matMinus(U2, U1, DU2) + shared_cy.matMinus(V2, V1, DV2) + + # SQUAREM update of V and U SqS3 + shared_cy.matMinus(DU2, DU1, DU3) + shared_cy.matMinus(DV2, DV1, DV3) + sv2_U = shared_cy.matSumSquare(DU3) + sv2_V = shared_cy.matSumSquare(DV3) + if i == 1: + if np.isclose(sv2_U, 0.0): + print("No missingness in data. Skipping iterative approach!") + converged = False + break + aU = max(1.0, sqrt(sr2_U/sv2_U)) + aV = max(1.0, sqrt(sr2_V/sv2_V)) + + # New accelerated update + shared_cy.matUpdate(U, DU1, DU3, aU) + shared_cy.matUpdate(V, DV1, DV3, aV) + + # Check optional cost function + if cost: + shared_cy.frobenius(D, f, U, V, sumV, threads) + print(f"Frobenius: {np.round(np.sum(sumV, dtype=float),1)}") + + # Break iterative update if converged + rmseU = shared_cy.rmse(U, U0) + print(f"Iteration {i},\tRMSE={round(rmseU, 9)}") + if rmseU < e_tole: + print("EM-PCA has converged.") + converged = True + break + if i == e_iter: + print("EM-PCA did not converged!") + converged = False + del U0, U1, U2, S1, S2, V1, V2, DU1, DU2, DU3, DV1, DV2, DV3 + + # Stabilization step + U, S, V = shared.halkoBatch(D, f, e, N, U, None, V, power, batch, seed, \ + False, threads) + U, V = shared.signFlip(U, V) + seed += 1 + + # Estimating SVD + print(f"Inferring {K} eigenvector(s).") + U, S, V = shared.halkoBatch(D, f, K, N, U, S, V, power, batch, seed, \ + True, threads) + U, V = shared.signFlip(U, V) + return U, S, V, i, converged diff --git a/src/memory_cy.pyx b/src/memory_cy.pyx new file mode 100644 index 0000000..2cfc781 --- /dev/null +++ b/src/memory_cy.pyx @@ -0,0 +1,118 @@ +# cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True +import numpy as np +cimport numpy as np +from cython.parallel import prange +from libc.math cimport sqrt + +##### EMU-mem ##### +# Load centered chunk of PLINK file for SVD using frequencies +cpdef void plinkFreq(unsigned char[:,::1] D, float[:,::1] E, float[::1] f, \ + int M_w, int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int B = D.shape[1] + int i, j, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + for j in prange(M, num_threads=t): + i = 0 + for b in range(B): + byte = D[M_w+j,b] + for bytepart in range(4): + if recode[byte & mask] != 9: + E[j,i] = recode[byte & mask] - 2.0*f[M_w+j] + else: + E[j,i] = 0.0 + byte = byte >> 2 + i = i + 1 + if i == N: + break + +# Load standardized chunk of PLINK file for SVD using frequencies +cpdef void plinkFinalFreq(unsigned char[:,::1] D, float[:,::1] E, float[::1] f, \ + int M_w, int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int B = D.shape[1] + int i, j, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + for j in prange(M, num_threads=t): + i = 0 + for b in range(B): + byte = D[M_w+j,b] + for bytepart in range(4): + if recode[byte & mask] != 9: + E[j,i] = recode[byte & mask] - 2.0*f[M_w+j] + E[j,i] /= sqrt(2.0*f[M_w+j]*(1.0 - f[M_w+j])) + else: + E[j,i] = 0.0 + byte = byte >> 2 + i = i + 1 + if i == N: + break + +# Load centered chunk of PLINK file for SVD using factor matrices +cpdef void plinkSVD(unsigned char[:,::1] D, float[:,::1] E, float[:,::1] U, \ + float[:,::1] V, float[::1] f, int M_w, int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int B = D.shape[1] + int K = U.shape[1] + int i, j, k, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + for j in prange(M, num_threads=t): + i = 0 + for b in range(B): + byte = D[M_w+j,b] + for bytepart in range(4): + if recode[byte & mask] != 9: + E[j,i] = recode[byte & mask] - 2.0*f[M_w+j] + else: + E[j,i] = 0.0 + for k in range(K): + E[j,i] += U[M_w+j,k]*V[i,k] + E[j,i] = min(max(E[j,i] + 2.0*f[M_w+j], 1e-4), 2-(1e-4)) + E[j,i] -= 2.0*f[M_w+j] + byte = byte >> 2 + i = i + 1 + if i == N: + break + +# Load standardized chunk of PLINK file for SVD using factor matrices +cpdef void plinkFinalSVD(unsigned char[:,::1] D, float[:,::1] E, float[:,::1] U, \ + float[::1] S, float[:,::1] V, float[::1] f, int M_w, int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int B = D.shape[1] + int K = U.shape[1] + int i, j, k, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + for j in prange(M, num_threads=t): + i = 0 + for b in range(B): + byte = D[M_w+j,b] + for bytepart in range(4): + if recode[byte & mask] != 9: + E[j,i] = recode[byte & mask] - 2.0*f[M_w+j] + else: + E[j,i] = 0.0 + for k in range(K): + E[j,i] += U[M_w+j,k]*S[k]*V[i,k] + E[j,i] = min(max(E[j,i] + 2.0*f[M_w+j], 1e-4), 2-(1e-4)) + E[j,i] -= 2.0*f[M_w+j] + E[j,i] /= sqrt(2.0*f[M_w+j]*(1.0 - f[M_w+j])) + byte = byte >> 2 + i = i + 1 + if i == N: + break diff --git a/src/shared.py b/src/shared.py new file mode 100644 index 0000000..d6bc54a --- /dev/null +++ b/src/shared.py @@ -0,0 +1,93 @@ +""" +EMU. +Iterative SVD algorithms. + +Jonas Meisner, Siyang Liu, Mingxi Huang and Anders Albrechtsen +""" + +__author__ = "Jonas Meisner" + +# Libraries +import subprocess +import numpy as np +from math import ceil +from src import memory_cy + +##### Shared functions ##### +### Helper functions +# Reader help function +def extract_length(filename): + process = subprocess.Popen(['wc', '-l', filename], stdout=subprocess.PIPE) + result, err = process.communicate() + return int(result.split()[0]) + +# Flip signs of SVD output - Based on scikit-learn (svd_flip) +def signFlip(U, V): + maxCols = np.argmax(np.abs(U), axis=0) + signs = np.sign(U[maxCols, range(U.shape[1])]) + U *= signs + V *= signs + return U, V + +### Randomized SVD functions +# PCAone Halko full +def halko(E, K, power, seed): + M, N = E.shape + L = K + 16 + rng = np.random.default_rng(seed) + O = rng.standard_normal(size=(N, L)).astype(np.float32) + A = np.zeros((M, L), dtype=np.float32) + H = np.zeros((N, L), dtype=np.float32) + for p in range(power): + if p > 0: + O, _ = np.linalg.qr(H, mode="reduced") + np.dot(E, O, out=A) + np.dot(E.T, A, out=H) + Q, R = np.linalg.qr(A, mode="reduced") + B = np.linalg.solve(R.T, H.T) + Uhat, S, V = np.linalg.svd(B, full_matrices=False) + U = np.dot(Q, Uhat) + del A, B, H, O, Q, R, Uhat + U = np.ascontiguousarray(U[:,:K]) + V = np.ascontiguousarray(V[:K,:].T) + return U, S[:K], V + +# PCAone Halko acceleration +def halkoBatch(D, f, K, N, U0, S0, V0, power, batch, seed, final, threads): + M, B = D.shape + W = ceil(M/batch) + L = K + 16 + rng = np.random.default_rng(seed) + O = rng.standard_normal(size=(N, L)).astype(np.float32) + A = np.zeros((M, L), dtype=np.float32) + H = np.zeros((N, L), dtype=np.float32) + for p in range(power): + E = np.zeros((batch, N), dtype=np.float32) + if p > 0: + O, _ = np.linalg.qr(H, mode="reduced") + H.fill(0.0) + for w in range(W): + M_w = w*batch + if w == (W-1): # Last batch + del E # Ensure no extra copy + E = np.zeros((M - M_w, N), dtype=np.float32) + if U0 is None: + if final: + memory_cy.plinkFinalFreq(D, E, f, M_w, threads) + else: + memory_cy.plinkFreq(D, E, f, M_w, threads) + else: + if final: + memory_cy.plinkFinalSVD(D, E, U0, S0, V0, f, M_w, threads) + else: + memory_cy.plinkSVD(D, E, U0, V0, f, M_w, threads) + A[M_w:(M_w + E.shape[0])] = np.dot(E, O) + H += np.dot(E.T, A[M_w:(M_w + E.shape[0])]) + Q, R = np.linalg.qr(A, mode="reduced") + B = np.linalg.solve(R.T, H.T) + Uhat, S, V = np.linalg.svd(B, full_matrices=False) + U = np.dot(Q, Uhat) + del A, B, H, O, Q, R, Uhat, E + U = np.ascontiguousarray(U[:,:K]) + V = np.ascontiguousarray(V[:K,:].T) + return U, S[:K], V diff --git a/src/shared_cy.pyx b/src/shared_cy.pyx new file mode 100644 index 0000000..1eb49c5 --- /dev/null +++ b/src/shared_cy.pyx @@ -0,0 +1,224 @@ +# cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True +import numpy as np +cimport numpy as np +from cython.parallel import prange +from libc.math cimport sqrt + +##### EMU ##### +# Estimate population allele frequencies +cpdef void estimateF(unsigned char[:,::1] D, float[::1] f, int N, int t) nogil: + cdef: + int M = D.shape[0] + int B = D.shape[1] + int i, j, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + float n + for j in prange(M, num_threads=t): + i = 0 + n = 0.0 + for b in range(B): + byte = D[j,b] + for bytepart in range(4): + if recode[byte & mask] != 9: + f[j] += recode[byte & mask] + n = n + 1.0 + byte = byte >> 2 # Right shift 2 bits + i = i + 1 + if i == N: + break + if n > 0.0: + f[j] /= (2.0*n) + else: + f[j] = 0.0 + +# Array filtering +cpdef void filterArrays(unsigned char[:,::1] D, float[::1] f, \ + unsigned char[::1] mask) nogil: + cdef: + int M = D.shape[0] + int B = D.shape[1] + int c = 0 + int j, b + for j in range(M): + if mask[j] == 1: + for b in range(B): + D[c,b] = D[j,b] + f[c] = f[j] + c = c + 1 + +# Initial update of dosage matrix +cpdef void updateInit(unsigned char[:,::1] D, float[::1] f, float[:,::1] E, \ + int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int B = D.shape[1] + int i, j, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + for j in prange(M, num_threads=t): + i = 0 + for b in range(B): + byte = D[j,b] + for bytepart in range(4): + if recode[byte & mask] != 9: + E[j,i] = recode[byte & mask] - 2.0*f[j] + byte = byte >> 2 # Right shift 2 bits + i = i + 1 + if i == N: + break + +# Update E directly from SVD +cpdef void updateSVD(unsigned char[:,::1] D, float[:,::1] E, float[::1] f, \ + float[:,::1] U, float[::1] S, float[:,::1] V, int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int B = D.shape[1] + int K = U.shape[1] + int i, j, k, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + for j in prange(M, num_threads=t): + i = 0 + for b in range(B): + byte = D[j,b] + for bytepart in range(4): + if recode[byte & mask] == 9: + E[j,i] = 0.0 + for k in range(K): + E[j,i] += U[j,k]*S[k]*V[i,k] + E[j,i] = min(max(E[j,i] + 2.0*f[j], 1e-4), 2-(1e-4)) + E[j,i] -= 2.0*f[j] + byte = byte >> 2 # Right shift 2 bits + i = i + 1 + if i == N: + break + +# Accelerated update of E directly from SVD +cpdef void updateAccel(unsigned char[:,::1] D, float[:,::1] E, float[::1] f, \ + float[:,:] U, float[:,:] V, int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int B = D.shape[1] + int K = U.shape[1] + int i, j, k, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + for j in prange(M, num_threads=t): + i = 0 + for b in range(B): + byte = D[j,b] + for bytepart in range(4): + if recode[byte & mask] == 9: + E[j,i] = 0.0 + for k in range(K): + E[j,i] += U[j,k]*V[i,k] + E[j,i] = min(max(E[j,i] + 2.0*f[j], 1e-4), 2-(1e-4)) + E[j,i] -= 2.0*f[j] + byte = byte >> 2 # Right shift 2 bits + i = i + 1 + if i == N: + break + +# Standardize dosage matrix +cpdef void standardizeMatrix(float[:,::1] E, float[::1] f, int t) nogil: + cdef: + int M = E.shape[0] + int N = E.shape[1] + int i, j + for j in prange(M, num_threads=t): + for i in range(N): + E[j,i] /= sqrt(2.0*f[j]*(1.0 - f[j])) + +# Root-mean squared error +cpdef float rmse(float[:,::1] A, float[:,::1] B) nogil: + cdef: + int M = A.shape[0] + int N = A.shape[1] + int i, j + float res = 0.0 + for j in range(M): + for i in range(N): + res += (A[j,i] - B[j,i])*(A[j,i] - B[j,i]) + return sqrt(res/((M*N))) + +# Selection scan +cpdef void galinskyScan(float[:,::1] U, float[:,::1] Dsquared) nogil: + cdef: + int M = U.shape[0] + int K = U.shape[1] + int j, k + for j in range(M): + for k in range(K): + Dsquared[j,k] = (U[j,k]**2)*float(M) + +### Accelerated EM +# Matrix subtraction +cpdef void matMinus(float[:,::1] A, float[:,::1] B, float[:,::1] C) nogil: + cdef: + int M = A.shape[0] + int N = A.shape[1] + int i, j + for j in range(M): + for i in range(N): + C[j,i] = A[j,i] - B[j,i] + +# Matrix sum of squares +cpdef float matSumSquare(float[:,::1] A) nogil: + cdef: + int M = A.shape[0] + int N = A.shape[1] + int i, j + float res = 0.0 + for j in range(M): + for i in range(N): + res += A[j,i]*A[j,i] + return res + +# Update factor matrices +cpdef void matUpdate(float[:,::1] A, float[:,::1] D1, float[:,::1] D3, \ + float alpha) nogil: + cdef: + int M = A.shape[0] + int N = A.shape[1] + int i, j + for j in range(M): + for i in range(N): + A[j,i] = A[j,i] + 2*alpha*D1[j,i] + alpha*alpha*D3[j,i] + +### Likelihood measures for debugging +cpdef void frobenius(unsigned char[:,::1] D, float[::1] f, float[:,::1] U, \ + float[:,::1] V, float[::1] sumV, int t) nogil: + cdef: + int M = U.shape[0] + int K = U.shape[1] + int N = V.shape[0] + int B = D.shape[1] + int i, j, k, b, bytepart + unsigned char[4] recode = [0, 9, 1, 2] + unsigned char mask = 3 + unsigned char byte + float e + for j in prange(M, num_threads=t): + i = 0 + sumV[j] = 0.0 + for b in range(B): + byte = D[j,b] + for bytepart in range(4): + if recode[byte & mask] != 9: + e = 0.0 + for k in range(K): + e = e + U[j,k]*V[i,k] + e = min(max(e + 2.0*f[j], 1e-4), 2-(1e-4)) + sumV[j] += (recode[byte & mask] - e)**2 + byte = byte >> 2 # Right shift 2 bits + i = i + 1 + if i == N: + break