-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathslice_alignment.py
executable file
·92 lines (77 loc) · 3.83 KB
/
slice_alignment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
#
# slice_alignment.py created 2017-11-26
'''slice_alignment.py last modified 2022-01-13
extract specific sites from a large alignment
this could be sites determined to be important from another program,
such as undergoing selection, low coverage, active sites, etc.
slice_alignment.py -a matrix.phy -i 135,289,455 -o subset_matrix.phy
for partitioned alignments, formats (-f) include:
clustal, fasta, nexus, phylip, phylip-relaxed, stockholm
'''
import sys
import os
import argparse
import time
import gzip
from collections import Counter
from Bio import AlignIO
def get_indices(indexfile):
'''read comma-delimited or line separated file and return a dict where key is position'''
indexdict = {} # key is int of index, value is True
for line in open(indexfile,'r'):
line = line.strip()
if line:
positions = line.split(",") # split "136,299,468,486"]
for pos in positions: # keep positions as native numbers, 1-100, not 0-99
indexdict[int(pos)] = True
print( "# read {} positions from {} {}".format(len(indexdict), indexfile, time.asctime() ), file=sys.stderr )
return indexdict
def string_to_indices(indexstring):
indexdict = {} # key is int of index, value is True
positions = indexstring.split(",") # split "136,299,468,486"]
for pos in positions: # keep positions as native numbers, 1-100, not 0-99
indexdict[int(pos)] = True
print( "# read {} positions from input {}".format( len(indexdict) , time.asctime() ), file=sys.stderr )
return indexdict
def slice_alignment(fullalignment, alignformat, indexdict):
'''read alignment, and return a new alignment where partitions are reordered from highest coverage to lowest'''
if fullalignment.rsplit('.',1)[1]=="gz": # autodetect gzip format
opentype = gzip.open
print( "# reading alignment {} as gzipped {}".format(fullalignment, time.asctime() ), file=sys.stderr )
else: # otherwise assume normal open
opentype = open
print( "# reading alignment {} {}".format(fullalignment, time.asctime() ), file=sys.stderr )
alignedseqs = AlignIO.read(opentype(fullalignment), alignformat)
num_species = len(alignedseqs)
print( "# alignment has {} taxa with {} positions {}".format(num_species, alignedseqs.get_alignment_length(), time.asctime() ), file=sys.stderr )
newalign = alignedseqs[:,0:0] # start with blank alignment
print( "# slicing alignment {}".format( time.asctime() ), file=sys.stderr )
for pos in range(alignedseqs.get_alignment_length()):
if pos+1 in indexdict:
alignpart = alignedseqs[:, pos:pos+1 ] # alignment of each partition only
#print( "keeping pos {}".format(pos+1)
newalign += alignpart
print( "# alignment has {} taxa with {} sites {}".format( len(newalign), newalign.get_alignment_length() , time.asctime() ), file=sys.stderr )
return newalign
def main(argv, wayout):
if not len(argv):
argv.append('-h')
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description=__doc__)
parser.add_argument('-a','--alignment', help="supermatrix alignment")
parser.add_argument('-f','--format', default="fasta", help="alignment format [fasta]")
parser.add_argument('-i','--index', help="comma-separated list of positions (as numbers, no spaces) or file with numbers on each line")
parser.add_argument('-o','--output', help="output name for new alignment", required=True)
args = parser.parse_args(argv)
# check if -i is file or not, treat accordingly
if os.path.isfile(args.index):
indexdict = get_indices(args.index)
else: # assume string
indexdict = string_to_indices(args.index)
# build sliced alignment
subalignment = slice_alignment(args.alignment, args.format, indexdict)
# write to file
AlignIO.write(subalignment, args.output, args.format)
print( "# Subalignment written to {} {}".format(args.output, time.asctime() ), file=sys.stderr )
if __name__ == "__main__":
main(sys.argv[1:], sys.stdout)