Skip to content

Commit 359f71b

Browse files
jaclewJacob Lewerentz
and
Jacob Lewerentz
authoredJan 17, 2024
Changed code regarding multiprocessing Queue in the return from XMFA_obj.run in find_snps. It is now "streamed" instead of processed once the child finished. (#40)
Co-authored-by: Jacob Lewerentz <jaclew@# Managed by FOI IT-Enheten. To make changes contact Servicedesk>
1 parent e737121 commit 359f71b

File tree

1 file changed

+84
-27
lines changed

1 file changed

+84
-27
lines changed
 

‎CanSNPer2/modules/CanSNPer2.py

+84-27
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
## import standard python libraries for subprocess and multiprocess
1717
from subprocess import Popen,PIPE,STDOUT
1818
from multiprocessing import Process, Queue
19-
from time import sleep
19+
from time import sleep,time
2020

2121
class Error(Exception):
2222
"""docstring for Error"""
@@ -120,14 +120,14 @@ def run_mauve(self, commands,logs):
120120
'''
121121
retvalue=0
122122
processes = [] #process container
123-
log_f = {} #log container
123+
log_f = {} #log container
124124
error = 0 #Variable for errors
125125
logger.info("Starting progressiveMauve on {n} references".format(n=len(commands)))
126126
for i in range(len(commands)): #Loop through commands,
127127
command = commands[i]
128-
logger.debug(command) ## In verbose mode print the actual mauve command
128+
logger.debug(command) ## In verbose mode print the actual mauve command
129129
p = Popen(command.split(" "), stdout=PIPE, stderr=STDOUT) ##Split command to avoid shell=True pipe stdout and stderr to stdout
130-
log_f[p.stdout.fileno()] = logs[i] ## Store the reference to the correct log file
130+
log_f[p.stdout.fileno()] = logs[i] ## Store the reference to the correct log file
131131
processes.append(p)
132132
while processes:
133133
for p in processes: ## Loop through processes
@@ -164,11 +164,11 @@ def run_mauve(self, commands,logs):
164164

165165
def create_mauve_command(self,query,references=[]):
166166
'''Mauve commands'''
167-
commands =[] # store execute command
168-
logs = [] # store log filepath
169-
if len(references) == 0: ## If specific references are not given fetch references from the reference folder
167+
commands =[] # store execute command
168+
logs = [] # store log filepath
169+
if len(references) == 0: ## If specific references are not given fetch references from the reference folder
170170
references = self.get_references()
171-
for ref in references: ## For each reference in the reference folder align to query
171+
for ref in references: ## For each reference in the reference folder align to query
172172
ref_name = ref.rsplit(".",1)[0] ## remove file ending
173173
#self.query_name = os.path.basename(query).rsplit(".",1)[0] ## get name of file and remove ending
174174
xmfa_output = "{tmpdir}/{ref}_{target}.xmfa".format(tmpdir=self.tmpdir.rstrip("/"),ref=ref_name,target=self.query_name)
@@ -177,13 +177,13 @@ def create_mauve_command(self,query,references=[]):
177177

178178
'''Create run command for mauve'''
179179
command = "{mauve_path}progressiveMauve --output {xmfa} {ref_fasta} {target_fasta}".format(
180-
mauve_path = self.mauve_path,
181-
xmfa = xmfa_output,
182-
ref_fasta = ref_file,
183-
target_fasta = query
180+
mauve_path = self.mauve_path,
181+
xmfa = xmfa_output,
182+
ref_fasta = ref_file,
183+
target_fasta = query
184184
)
185-
commands.append(command) ## Mauve command
186-
logs.append(log_file) ## Store log files for each alignment
185+
commands.append(command) ## Mauve command
186+
logs.append(log_file) ## Store log files for each alignment
187187
self.xmfa_files.append(xmfa_output) ## Store the path to xmfa files as they will be used later
188188
return commands,logs
189189

@@ -242,14 +242,25 @@ def parse_xmfa(XMFA_obj, xmfa_file,results=[]):
242242
export_results.put(XMFA_obj.get_snp_info())
243243
called_snps.put(XMFA_obj.get_called_snps())
244244
return results
245-
245+
246246
def find_snps(self,XMFA_obj,xmfa_file,results=[],export_results=[],called_snps=[]):
247247
'''Align sequences to references and return SNPs'''
248+
# execute snp calling
248249
XMFA_obj.run(xmfa_file)
249-
results.put(XMFA_obj.get_snps())
250-
export_results.put(XMFA_obj.get_snp_info())
251-
called_snps.put(XMFA_obj.get_called_snps())
252-
return results
250+
#/
251+
# stream output into Queues (if the complete job output is put in then the queue gets filled when having "large" outputs. It has to be streamed to the queue so that the queue can simultanously be cleared)
252+
for result in XMFA_obj.get_snps().items(): # returns dictionary
253+
results.put(result)
254+
for result in XMFA_obj.get_snp_info(): # returns array
255+
export_results.put(result)
256+
for result in XMFA_obj.get_called_snps(): # returns array
257+
called_snps.put(result)
258+
#/
259+
# when finished, put a "stop-signal"
260+
results.put('XMFA_obj_finish_worker_'+XMFA_obj.reference)
261+
export_results.put('XMFA_obj_finish_worker_'+XMFA_obj.reference)
262+
called_snps.put('XMFA_obj_finish_worker_'+XMFA_obj.reference)
263+
#/
253264

254265
def find_snps_multiproc(self,xmfa_obj,xmfa_files,export=False):
255266
'''function to run genomes in paralell'''
@@ -260,19 +271,65 @@ def find_snps_multiproc(self,xmfa_obj,xmfa_files,export=False):
260271
result_queue = Queue()
261272
export_queue = Queue()
262273
called_queue = Queue()
263-
for xmfa_file in xmfa_files:
274+
for enum,xmfa_file in enumerate(xmfa_files):
264275
p = Process(target=self.find_snps, args=(xmfa_obj,xmfa_file ,result_queue,export_queue,called_queue))
265276
p.start()
266277
jobs.append(p)
267278
sleep(0.05) ## A short sleep to make sure all threads do not initiate access to the database file simultanously
279+
280+
## Parse output queues from jobs continuously
281+
finish_signals = {'SNPS':set(),'SNP_info':set(),'called_snps':set()} # will keep track when all workers are finished
282+
tic = time() # "tic-toc" clock-sound
283+
while True:
284+
# check SNP queue
285+
if not result_queue.empty():
286+
tmp_data = result_queue.get()
287+
if type(tmp_data) == str and tmp_data.find('XMFA_obj_finish_worker_') != -1:
288+
finish_signals['SNPS'].add(tmp_data.split('_')[-1])
289+
logger.info("Output queue for 'SNPS' finished for reference {reference}".format(reference=tmp_data.split('_')[-1]))
290+
else:
291+
SNPS[tmp_data[0]] = tmp_data[1]
292+
#/
293+
# check SNP info queue
294+
if not export_queue.empty():
295+
tmp_data = export_queue.get()
296+
if type(tmp_data) == str and tmp_data.find('XMFA_obj_finish_worker_') != -1:
297+
finish_signals['SNP_info'].add(tmp_data.split('_')[-1])
298+
logger.info("Output queue for 'SNP_info' finished for reference {reference}".format(reference=tmp_data.split('_')[-1]))
299+
else:
300+
SNP_info.append(tmp_data)
301+
#/
302+
# check called snps queue
303+
if not called_queue.empty():
304+
tmp_data = called_queue.get()
305+
if type(tmp_data) == str and tmp_data.find('XMFA_obj_finish_worker_') != -1:
306+
finish_signals['called_snps'].add(tmp_data.split('_')[-1])
307+
logger.info("Output queue for 'called_snps' finished for reference {reference}".format(reference=tmp_data.split('_')[-1]))
308+
else:
309+
called_snps.append(tmp_data)
310+
#/
311+
# check if finished (all xmfa files has returned the "finish worker" in all output queues)
312+
finish_signals_called = []
313+
for queue_name,signals in finish_signals.items():
314+
if len(signals) == len(xmfa_files):
315+
finish_signals_called.append(queue_name)
316+
if len(finish_signals) == len(finish_signals_called):
317+
logger.info("All output queues finished!")
318+
break
319+
#/
320+
# check if we have computed for X amount of time, then break and assume something is wrong
321+
max_time_seconds = 5*60 # 5*60 => 5 minutes
322+
toc = time()
323+
time_spent = toc - tic
324+
if time_spent > max_time_seconds:
325+
logger.warning("Maximum time spent reached to capture queue output. This is not expected to happen and might mean that your output data is corrupted.")
326+
break
327+
#/
328+
##/
329+
## Wait until processes terminated (their output should already have been processed in the queue's)
268330
for job in jobs:
269331
job.join()
270-
for j in jobs:
271-
## Merge SNP result dictionaries
272-
SNPS = dict(**SNPS, **result_queue.get())
273-
### join SNP info files
274-
SNP_info+= export_queue.get()
275-
called_snps+=called_queue.get()
332+
##/
276333
return SNPS,SNP_info,called_snps
277334

278335
def read_result_dir(self):
@@ -324,7 +381,7 @@ def run(self,database):
324381
verbose=self.verbose) ## Create XMFA object and connect to database
325382
'''Walk through the list of queries supplied'''
326383
if not self.skip_mauve: print("Run {n} alignments to references using progressiveMauve".format(n=len(self.query)))
327-
for q in self.query: ## For each query file_path
384+
for q in self.query: ## For each query file_path
328385
try:
329386
self.query_name = os.path.basename(q).rsplit(".",1)[0] ## get name of file and remove ending
330387

0 commit comments

Comments
 (0)