forked from collinmelton/RecurrentMutationStats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMain.py
271 lines (230 loc) · 12.7 KB
/
Main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
'''
Created on Feb 25, 2015
@author: cmelton
'''
# This package runs recurrent mutation analysis from mutation and coverage file inputs.
# The inputs for mutations should be in BED format and the coverage files in WIG format.
# Imports
from optparse import OptionParser
import os, threading, subprocess, Queue
# from GridEngine.Job_v2 import pipelineJob as GridEngineJob
# from GridEngine.GridEngine import GridEngine
import time
# GLOBAL VARIABLES
CONVERT_TO_WIG_PROGRAM="./Preprocessing/MergedMutToWig.py"
MUTATION_COVARIATE_PROGRAM="./CovariateStats/CovariatesForEveryPositionInWig.py"
WHOLEGENOME_COVARIATE_PROGRAM="./CovariateStats/WigStats_forSingleOverlapWig.py"
FORMAT_PATIENT_MODEL_DATA_PROGRAM="./CovariateStats/CombineMutAndGenomeCovariates.R"
COLLAPSE_MUT_COVARIATE_DATA_PROGRAM="./CovariateStats/MutCovariatesSummary.R"
LOGISTICREGRESSION_MODEL_PROGRAM="./CovariateStats/GenerateModel.R"
POISSONBINOMIAL_RECURRENT_PROB_PROGRAM="./PoissonBinomial/PoiBinProbsNew.R"
MERGE_MUTATIONS_PROGRAM="./MergeMutations.py"
COMPUTE_MERGED_MUTATION_PROBS_PROGRAM="./PoissonBinomial/GetRegionProbs.R"
DEFAULTGRIDSCRIPTPATH="/home/cmelton/"
DEFAULTGRIDOUTPUTPATH="/home/cmelton/"
DEFAULTGRIDERRORPATH="/home/cmelton/"
DEFAULTGRIDEMAILADDRESS="cmelton@stanford.edu"
# this function reads the input file and generates a dictionary of column
# one as key and column two as value
def GetFiles(MutationFileListFile):
f=open(MutationFileListFile, 'r')
lines=f.read().split("\n")
f.close()
header=lines[0].strip().split("\t")
lines=lines[1:]
result={}
for line in map(lambda x: x.strip().split("\t"), lines):
result[line[0]]=dict(map(lambda i: (header[i], line[i]), range(len(line))))
return result
class GridTools():
def __init__(self):
pass
@staticmethod
# determine if any jobs remain
def JobsRemain(JobsDict):
for jobName in JobsDict:
job=JobsDict[jobName]
if not job.finished: return True
return False
@staticmethod
# update jobs dict with all finished jobs
def UpdateJobsDict(JobsDict):
for jobName in JobsDict:
job=JobsDict[jobName]
job.updateStatus()
@staticmethod
# run all jobs until none remain
def RunJobs(JobsDict, grid):
# while jobs remain check if jobs are ready and start them
while (GridTools.JobsRemain(JobsDict)):
# update Jobs Dict with newly completed Jobs
grid.updateJobDict(JobsDict)
# start jobs that are ready
for jobName in JobsDict:
job=JobsDict[jobName]
if job.readyToRun(JobsDict) and not job.started:
job.start()
print job.name, job.started
# wait before checking on jobs to run
time.sleep(60)
@staticmethod
def CreateJobInfoDict(name, command):
return {"dependencies": "",
"scriptPath": DEFAULTGRIDSCRIPTPATH,
"scriptName": name,
"scriptTime": "24:00:00",
"scriptErrorFileDirectory": DEFAULTGRIDOUTPUTPATH,
"scriptOutputFileDirectory": DEFAULTGRIDERRORPATH,
"scriptCustomizations": "",
"scriptCommand": command,
"scriptMemory": 6,
"scriptEmailAddress": DEFAULTGRIDEMAILADDRESS,
"inputs": ""}
# a class to run a command through subprocess in a thread
class ThreadCommand(threading.Thread):
"""Simple class to run multiple commands in parallel"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
command = self.queue.get()
try: result=subprocess.check_output(command,shell=True, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
result = "\n".join(map(str, [e.cmd, e.returncode, e.output]))
print "failed", result
self.queue.task_done()
# runs multiple commands in parallel via the thread and subprocess modules
def RunCommands(commands, n, grid):
if grid!=None:
pass
# JobsDict = dict(map(lambda x: (str(i), GridEngineJob(grid, GridTools.CreateJobInfoDict(str(i), commands[i]))), range(len(commands))))
# GridTools.RunJobs(JobsDict, grid)
else:
queue = Queue.Queue()
#spawn a pool of threads, and pass them queue instance
for i in range(min(len(commands), n)):
t = ThreadCommand(queue)
t.setDaemon(True)
t.start()
#populate queue with data
for command in commands:
queue.put(command)
#wait on the queue until everything has been processed
queue.join()
# generates a combined covariate file to speed up processing
def CombineCovariates(CovariateFiles, CombinedCovariatePath):
if not os.path.exists(CombinedCovariatePath):
pass
# for each mutation file get covariates for each mutation
def GetMutationCovariates(MutationFiles, CombinedCovariateFile, parallel, grid):
# convert to wig files
commands=[]
for sample in MutationFiles.values():
commands.append("python "+CONVERT_TO_WIG_PROGRAM+" --I "+sample["MutationFile"]+" --O "+sample["MutationWigFile"])
RunCommands(commands, parallel, grid)
# get covariates
commands=[]
for sample in MutationFiles.values():
commands.append("python "+MUTATION_COVARIATE_PROGRAM+" --I "+sample["MutationWigFile"]+" --O "+sample["MutationCovariateFile"]+" --V "+CombinedCovariateFile)
RunCommands(commands, parallel, grid)
# combine into summary file
commands=[]
for sample in MutationFiles.values():
commands.append("/usr/local/bin/Rscript "+COLLAPSE_MUT_COVARIATE_DATA_PROGRAM+" "+sample["MutationCovariateFile"]+" "+sample["MutationCovariateSummaryFile"])
RunCommands(commands, parallel, grid)
# get covariate totals for whole genome
def GetWholeGenomeCovariates(MutationFiles, CombinedCovariateFile, parallel, grid):
commands=[]
for sample in MutationFiles.values():
commands.append("python "+WHOLEGENOME_COVARIATE_PROGRAM+" --I "+sample["CoverageWigFile"]+" --O "+sample["WGCovariateFile"]+" --V "+CombinedCovariateFile)
RunCommands(commands, parallel, grid)
# combine into summary file
commands=[]
for sample in MutationFiles.values():
commands.append("/usr/local/bin/Rscript "+FORMAT_PATIENT_MODEL_DATA_PROGRAM+" "+sample["WGCovariateFile"]+" "+sample["MutationCovariateSummaryFile"]+" "+sample["ModelData"]+" "+sample["pid"])
RunCommands(commands, parallel, grid)
# fit the logistic regression sample specific probability model
def GenerateLRModel(MutationFiles, LRModelName):
modeldata="|".join(map(lambda sample:sample["ModelData"], MutationFiles.values()))
print modeldata
subprocess.call("/usr/local/bin/Rscript "+LOGISTICREGRESSION_MODEL_PROGRAM+" '"+modeldata+"' '"+LRModelName+"'", shell=True)
# combine mutations into a single file with chrom pos and count info
def MergeMutations(MutationFiles, MergedMutationFilename, splitByChrom=True):
mutationFiles="||".join(map(lambda sample: sample["pid"]+"|"+sample["MutationCovariateFile"], MutationFiles.values()))
print mutationFiles
if splitByChrom:
subprocess.call("python "+MERGE_MUTATIONS_PROGRAM+" --I '"+mutationFiles+"' --O "+MergedMutationFilename+" --S T", shell=True)
else:
subprocess.call("python "+MERGE_MUTATIONS_PROGRAM+" --I '"+mutationFiles+"' --O "+MergedMutationFilename+" --S F", shell=True)
# Get Sample Specific Probabilities for each mutation
def GetSampleSpecificMutationProbs(MergedMutationFilename, LRModelName, parallel, grid):
MergedMutationFilenames=["/".join(MergedMutationFilename.split("/")[0:-1])+"/"+x for x in os.listdir("/".join(MergedMutationFilename.split("/")[0:-1])) if MergedMutationFilename.split("/")[-1] in x]
commands=[]
for filename in MergedMutationFilenames:
commands.append("/usr/local/bin/Rscript "+COMPUTE_MERGED_MUTATION_PROBS_PROGRAM+" "+filename+" "+LRModelName+" "+filename+".prob.tsv")
RunCommands(commands, parallel, grid)
# Combine sample specific probabilitites with Poisson Binomial to get Recurrence Probabilities
def ComputePoissonBinomialProbs(MergedMutationFilename, parallel, grid, regionSize="'1'"):
MergedMutationProbFilenames=["/".join(MergedMutationFilename.split("/")[0:-1])+"/"+x for x in os.listdir("/".join(MergedMutationFilename.split("/")[0:-1])) if MergedMutationFilename.split("/")[-1] in x and ".prob.tsv" in x]
commands=[]
for filename in MergedMutationProbFilenames:
numberOfMutationsPerSiteData=filename[0:-9]
outputCSVFile="'"+filename+".poibin.csv"+"'"
commands.append("/usr/local/bin/Rscript "+POISSONBINOMIAL_RECURRENT_PROB_PROGRAM+" "+filename+" "+numberOfMutationsPerSiteData+" "+outputCSVFile+" "+regionSize)
RunCommands(commands, parallel, grid)
# this functions gets the command line options for running the program
def getOptions():
parser = OptionParser()
parser.add_option("--M", dest = "MutationFileListFile", help = "this should be a tab delimited file with patient id and mutation file location",
metavar = "STRING", type = "string", default = "MutationFiles.tsv")
parser.add_option("--C", dest = "CovariateFileListFile", help = "this should be a tab delimited file with covariate file name and file location",
metavar = "STRING", type = "string", default = "")
parser.add_option("--CC", dest = "CombinedCovariateFile", help = "this should be a filename for the combined covariates",
metavar = "STRING", type = "string", default = "./Preprocessing/overlap.22.out") #./CovariateStats/TestData/mergedBPRepTimingTranscript.wig.gz")#"./CovariateStats/TestData/mergedBPRepTimingTranscript.22.wig.gz")
parser.add_option("--LR", dest = "LRModelName", help = "this should be a filename for the fitted logistic regression model",
metavar = "STRING", type = "string", default = "./Temp/LRModel")
parser.add_option("--P", dest = "parallel", help = "the number of jobs to run in parallel",
metavar = "STRING", type = "string", default = "2")
parser.add_option("--MF", dest = "MergedMutationFilename", help = "the path to the merged mutation file",
metavar = "STRING", type = "string", default = "./Temp/MergedMutations.tsv")
parser.add_option("--G", dest = "grid", help = "T or F to specify use of the grid engine",
metavar = "STRING", type = "string", default = "F")
parser.add_option("--L", dest = "logFilePath", help = "the path to a log file for the grid engine",
metavar = "STRING", type = "string", default = "")
parser.add_option("--RS", dest = "regionSize", help = "",
metavar = "STRING", type = "string", default = "1")
(options, args) = parser.parse_args()
return options
if __name__ == '__main__':
# get options
options = getOptions()
#initialize grid engine if needed, not implemented here yet
grid=None
if options.grid=="T":
pass
# grid = GridEngine(options.logFileFilePath)
try:
# get a dictionary of patient id and mutation file
MutationFiles=GetFiles(options.MutationFileListFile)
print MutationFiles
# not implemented here yet, see preprocessing folder
# Preprocess to get combined covariate file
# CovariateFiles=GetFiles(options.CovariateFileListFile)
# CombineCovariates(CovariateFiles, options.CombinedCovariateFile)
# Get Covariates for Mutations
GetMutationCovariates(MutationFiles, options.CombinedCovariateFile, int(options.parallel), grid)
# Get Covariates for the Whole Genome
GetWholeGenomeCovariates(MutationFiles, options.CombinedCovariateFile, int(options.parallel), grid)
# Generate Logistic Regression Sample Specific Probability Model
GenerateLRModel(MutationFiles, options.LRModelName)
# Get mutation counts for each mutation across samples
MergeMutations(MutationFiles, options.MergedMutationFilename, splitByChrom=True)
# Get Sample Specific Probabilities
GetSampleSpecificMutationProbs(options.MergedMutationFilename, options.LRModelName, int(options.parallel), grid)
# Compute Poisson Binomial Recurrence Probabilities
ComputePoissonBinomialProbs(options.MergedMutationFilename, int(options.parallel), grid, regionSize="'"+options.regionSize+"'")
finally:
# exit drmaa session
if grid!=None:
grid.exit()