Skip to content

Commit

Permalink
Abstracting demultiplexing phase 1
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermo-carrasco committed Mar 26, 2015
1 parent 565d982 commit b91d78b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 158 deletions.
2 changes: 1 addition & 1 deletion taca/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Main TACA module
"""

__version__ = '0.1.7'
__version__ = '0.1.8'
160 changes: 22 additions & 138 deletions taca/analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime
import re

from taca.illumina import Run
from taca.log import LOG
from taca.utils.filesystem import chdir
from taca.utils.config import CONFIG
Expand All @@ -36,32 +37,6 @@ def check_config_options(config):
"refer to the README file."))


def is_finished(run):
""" Checks if a run is finished or not. Check corresponding status file
:param str run: Run directory
"""
return os.path.exists(os.path.join(run, 'RTAComplete.txt'))


def processing_status(run):
""" Returns the processing status of a sequencing run. Status are:
TO_START - The BCL conversion and demultiplexing process has not yet started
IN_PROGRESS - The BCL conversion and demultiplexing process is started but not completed
COMPLETED - The BCL conversion and demultiplexing process is completed
:param str run: Run directory
"""
demux_dir = os.path.join(run, 'Demultiplexing')
if not os.path.exists(demux_dir):
return 'TO_START'
elif os.path.exists(os.path.join(demux_dir, 'Stats', 'DemultiplexingStats.xml')):
return 'COMPLETED'
else:
return 'IN_PROGRESS'


def is_transferred(run, transfer_file):
""" Checks wether a run has been transferred to the analysis server or not.
Returns true in the case in which the tranfer is ongoing.
Expand Down Expand Up @@ -308,132 +283,41 @@ def samplesheet_to_dict(samplesheet):
pass




def run_bcl2fastq(run, config):
""" Runs bcl2fast with the parameters found in the configuration file. After
that, demultiplexed FASTQ files are sent to the analysis server.
:param str run: Run directory
:param dict config: Parset configuration file
"""
LOG.info('Building bcl2fastq command')
with chdir(run):
cl_options = config['bcl2fastq']
cl = [cl_options.get('path')]

# Main options
if cl_options.get('runfolder'):
cl.extend(['--runfolder', cl_options.get('runfolder')])
cl.extend(['--output-dir', cl_options.get('output-dir', 'Demultiplexing')])

# Advanced options
if cl_options.get('input-dir'):
cl.extend(['--input-dir', cl_options.get('input-dir')])
if cl_options.get('intensities-dir'):
cl.extend(['--intensities-dir', cl_options.get('intensities-dir')])
if cl_options.get('interop-dir'):
cl.extend(['--interop-dir', cl_options.get('interop-dir')])
if cl_options.get('stats-dir'):
cl.extend(['--stats-dir', cl_options.get('stats-dir')])
if cl_options.get('reports-dir'):
cl.extend(['--reports-dir', cl_options.get('reports-dir')])

# Processing cl_options
threads = cl_options.get('loading-threads')
if threads and type(threads) is int:
cl.extend(['--loading-threads', '{}'.format(threads)])
threads = cl_options.get('demultiplexing-threads')
if threads and type(threads) is int:
cl.extend(['--demultiplexing-threads', '{}'.format(threads)])
threads = cl_options.get('processing-threads')
if threads and type(threads) is int:
cl.extend(['--processing-threads', '{}'.format(threads)])
threads = cl_options.get('writing-threads')
if threads and type(threads) is int:
cl.extend(['--writing-threads', '{}'.format(threads)])

# Behavioral options
adapter_stringency = cl_options.get('adapter-stringency')
if adapter_stringency and type(adapter_stringency) is float:
cl.extend(['--adapter-stringency', adapter_stringency])
aggregated_tiles = cl_options.get('aggregated-tiles')
if aggregated_tiles and aggregated_tiles in ['AUTO', 'YES', 'NO']:
cl.etend(['--aggregated-tiles', aggregated_tiles])
barcode_missmatches = cl_options.get('barcode-missmatches')
if barcode_missmatches and type(barcode_missmatches) is int \
and barcode_missmatches in range(3):
cl.extend(['--barcode-missmatches', barcode_missmatches])
if cl_options.get('create-fastq-for-index-reads'):
cl.append('--create-fastq-for-index-reads')
if cl_options.get('ignore-missing-bcls'):
cl.append('--ignore-missing-bcls')
if cl_options.get('ignore-missing-filter'):
cl.append('--ignore-missing-filter')
if cl_options.get('ignore-missing-locs'):
cl.append('--ignore-missing-locs')
mask = cl_options.get('mask-short-adapter-reads')
if mask and type(mask) is int:
cl.extend(['--mask-short-adapter-reads', mask])
minimum = cl_options.get('minimum-trimmed-reads')
if minimum and type(minimum) is int:
cl.extend(['--minimum-trimmed-reads', minimum])
if cl_options.get('tiles'):
cl.extend(['--tiles', cl_options.get('tiles')])

if cl_options.get('use-bases-mask'):
cl.extend(['--use-bases-mask', cl_options.get('use-bases-mask')])

if cl_options.get('with-failed-reads'):
cl.append('--with-failed-reads')
if cl_options.get('write-fastq-reverse-complement'):
cl.append('--write-fastq-reverse-complement')

LOG.info(("BCL to FASTQ conversion and demultiplexing started for "
" run {} on {}".format(os.path.basename(run), datetime.now())))

misc.call_external_command_detached(cl, with_log_files=True)

LOG.info(("BCL to FASTQ conversion and demultiplexing finished for "
"run {} on {}".format(os.path.basename(run), datetime.now())))
# Transfer the processed data to the analysis server
transfer_run(run, config)


def run_demultiplexing(run):
def run_preprocessing(run):
""" Run demultiplexing in all data directories """

config = CONFIG['preprocessing']

hiseq_runs = glob.glob(os.path.join(config['hiseq_data'], '1*XX')) if not run else [run]
for run in hiseq_runs:
run_name = os.path.basename(run)
LOG.info('Checking run {}'.format(run_name))
if is_finished(run):
status = processing_status(run)
if status == 'TO_START':
for _run in hiseq_runs:
run = Run(_run)
LOG.info('Checking run {}'.format(run.id))
if run.is_finished():
if run.status == 'TO_START':
LOG.info(("Starting BCL to FASTQ conversion and demultiplexing for "
"run {}".format(run_name)))
if prepare_sample_sheet(run, config): # work around LIMS problem
run_bcl2fastq(run, config)
elif status == 'IN_PROGRESS':
"run {}".format(run.id)))
if prepare_sample_sheet(run.run_dir, config): # work around LIMS problem
run.demultiplex(run.run_dir)
transfer_run(run.run_dir, config)
elif run.status == 'IN_PROGRESS':
LOG.info(("BCL conversion and demultiplexing process in progress for "
"run {}, skipping it".format(run_name)))
elif status == 'COMPLETED':
"run {}, skipping it".format(run.id)))
elif run.status == 'COMPLETED':
LOG.info(("Preprocessing of run {} is finished, check if run has been "
"transferred and transfer it otherwise".format(run_name)))
"transferred and transfer it otherwise".format(run.id)))

t_file = os.path.join(config['status_dir'], 'transfer.tsv')
t_file = os.path.join(config['status_dir'], 'transferun.tsv')
transferred = is_transferred(run, t_file)
if not transferred:
LOG.info("Run {} hasn't been transferred yet.".format(run_name))
LOG.info('Transferring run {} to {} into {}'.format(run_name,
LOG.info("Run {} hasn't been transferred yet.".format(run.id))
LOG.info('Transferring run {} to {} into {}'.format(run.id,
config['sync']['host'],
config['sync']['data_archive']))
transfer_run(run, config)
else:
LOG.info('Run {} already transferred to analysis server, skipping it'.format(run_name))
LOG.info('Run {} already transferred to analysis server, skipping it'.format(run.id))

if not is_finished(run):
if not run.is_finished():
# Check status files and say i.e Run in second read, maybe something
# even more specific like cycle or something
LOG.info('Run {} is not finished yet'.format(run_name))
LOG.info('Run {} is not finished yet'.format(run.id))
2 changes: 1 addition & 1 deletion taca/analysis/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def analysis():
def demultiplex(run):
""" Demultiplex all runs present in the data directories
"""
an.run_demultiplexing(run)
an.run_preprocessing(run)
134 changes: 116 additions & 18 deletions taca/illumina/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,107 @@

from xml.etree import ElementTree as ET

def demultiplex_HiSeq_X():
""" Demultiplexing for HiSeq X runs
"""
pass
from taca.log import LOG
from scilifelab.utils import misc
from taca.utils.config import CONFIG
from taca.utils.filesystem import chdir


def demultiplex_HiSeq():
def demultiplex_HiSeq_X(run):
""" Demultiplexing for HiSeq X runs
"""
LOG.info('Building bcl2fastq command')
config = CONFIG['preprocessing']
with chdir(run):
cl_options = config['bcl2fastq']
cl = [cl_options.get('XTen')]

# Main options
if cl_options.get('runfolder'):
cl.extend(['--runfolder', cl_options.get('runfolder')])
cl.extend(['--output-dir', cl_options.get('output-dir', 'Demultiplexing')])

# Advanced options
if cl_options.get('input-dir'):
cl.extend(['--input-dir', cl_options.get('input-dir')])
if cl_options.get('intensities-dir'):
cl.extend(['--intensities-dir', cl_options.get('intensities-dir')])
if cl_options.get('interop-dir'):
cl.extend(['--interop-dir', cl_options.get('interop-dir')])
if cl_options.get('stats-dir'):
cl.extend(['--stats-dir', cl_options.get('stats-dir')])
if cl_options.get('reports-dir'):
cl.extend(['--reports-dir', cl_options.get('reports-dir')])

# Processing cl_options
threads = cl_options.get('loading-threads')
if threads and type(threads) is int:
cl.extend(['--loading-threads', '{}'.format(threads)])
threads = cl_options.get('demultiplexing-threads')
if threads and type(threads) is int:
cl.extend(['--demultiplexing-threads', '{}'.format(threads)])
threads = cl_options.get('processing-threads')
if threads and type(threads) is int:
cl.extend(['--processing-threads', '{}'.format(threads)])
threads = cl_options.get('writing-threads')
if threads and type(threads) is int:
cl.extend(['--writing-threads', '{}'.format(threads)])

# Behavioral options
adapter_stringency = cl_options.get('adapter-stringency')
if adapter_stringency and type(adapter_stringency) is float:
cl.extend(['--adapter-stringency', adapter_stringency])
aggregated_tiles = cl_options.get('aggregated-tiles')
if aggregated_tiles and aggregated_tiles in ['AUTO', 'YES', 'NO']:
cl.etend(['--aggregated-tiles', aggregated_tiles])
barcode_missmatches = cl_options.get('barcode-missmatches')
if barcode_missmatches and type(barcode_missmatches) is int \
and barcode_missmatches in range(3):
cl.extend(['--barcode-missmatches', barcode_missmatches])
if cl_options.get('create-fastq-for-index-reads'):
cl.append('--create-fastq-for-index-reads')
if cl_options.get('ignore-missing-bcls'):
cl.append('--ignore-missing-bcls')
if cl_options.get('ignore-missing-filter'):
cl.append('--ignore-missing-filter')
if cl_options.get('ignore-missing-locs'):
cl.append('--ignore-missing-locs')
mask = cl_options.get('mask-short-adapter-reads')
if mask and type(mask) is int:
cl.extend(['--mask-short-adapter-reads', mask])
minimum = cl_options.get('minimum-trimmed-reads')
if minimum and type(minimum) is int:
cl.extend(['--minimum-trimmed-reads', minimum])
if cl_options.get('tiles'):
cl.extend(['--tiles', cl_options.get('tiles')])

if cl_options.get('use-bases-mask'):
cl.extend(['--use-bases-mask', cl_options.get('use-bases-mask')])

if cl_options.get('with-failed-reads'):
cl.append('--with-failed-reads')
if cl_options.get('write-fastq-reverse-complement'):
cl.append('--write-fastq-reverse-complement')

LOG.info(("BCL to FASTQ conversion and demultiplexing started for "
" run {} on {}".format(os.path.basename(run), datetime.now())))

misc.call_external_command_detached(cl, with_log_files=True)

LOG.info(("BCL to FASTQ conversion and demultiplexing finished for "
"run {} on {}".format(os.path.basename(run), datetime.now())))


def demultiplex_HiSeq(run):
""" Demultiplexing for HiSeq (V3/V4) runs
"""
pass
raise NotImplementedError('Meec! Demultiplexing for HiSeq (V3/V4) runs not implemented yet :-/')


def demultiplex_MiSeq():
def demultiplex_MiSeq(run):
""" Demultiplexing for MiSeq runs
"""
pass
raise NotImplementedError('Meec! Demultiplexing for HiSeq (V3/V4) runs not implemented yet :-/')


class Run(object):
Expand All @@ -34,18 +119,11 @@ def __init__(self, run_dir):
self._extract_run_info()

if self.run_type == 'HiSeq':
self.demultiplex = demultiplex_HiSeq()
self.demultiplex = demultiplex_HiSeq
elif self.run_type == 'HiSeq X':
self.demultiplex = demultiplex_HiSeq_X()
self.demultiplex = demultiplex_HiSeq_X
elif self.demultiplex == 'MiSeq':
self.demultiplex = demultiplex_MiSeq()


@property
def is_finished(self):
""" Returns true if the run is finished, false otherwise
"""
return os.path.exists(os.path.join(self.run_dir, 'RTAComplete.txt'))
self.demultiplex = demultiplex_MiSeq


def _extract_run_info(self):
Expand All @@ -72,3 +150,23 @@ def _extract_run_info(self):
self.run_type = 'MiSeq'
except AttributeError:
raise RuntimeError('Run type could not be determined for run {}'.format(self.id))


def is_finished(self):
""" Returns true if the run is finished, false otherwise
"""
return os.path.exists(os.path.join(self.run_dir, 'RTAComplete.txt'))

@property
def status(self):
if self.run_type == 'HiSeq X':
def processing_status(run):
demux_dir = os.path.join(self.run_dir, 'Demultiplexing')
if not os.path.exists(demux_dir):
return 'TO_START'
elif os.path.exists(os.path.join(demux_dir, 'Stats', 'DemultiplexingStats.xml')):
return 'COMPLETED'
else:
return 'IN_PROGRESS'
else:
raise NotImplementedError('Sorry... no status method defined for {} runs'.format(self.run_type))

0 comments on commit b91d78b

Please sign in to comment.