Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RF: improve support for queue args #328

Merged
merged 7 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion heudiconv/bids.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from random import sample
from glob import glob

from heudiconv.external.pydicom import dcm
from .external.pydicom import dcm

from .parser import find_files
from .utils import (
Expand Down
40 changes: 11 additions & 29 deletions heudiconv/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def get_parser():
group.add_argument('--files', nargs='*',
help='Files (tarballs, dicoms) or directories '
'containing files to process. Cannot be provided if '
'using --dicom_dir_template or --subjects')
'using --dicom_dir_template.')
parser.add_argument('-s', '--subjects', dest='subjs', type=str, nargs='*',
help='list of subjects - required for dicom template. '
'If not provided, DICOMS would first be "sorted" and '
Expand Down Expand Up @@ -173,8 +173,6 @@ def get_parser():
'single argument and return a single anonymized ID. '
'Also see --conv-outdir')
parser.add_argument('-f', '--heuristic', dest='heuristic',
# some commands might not need heuristic
# required=True,
help='Name of a known heuristic or path to the Python'
'script containing heuristic')
parser.add_argument('-p', '--with-prov', action='store_true',
Expand Down Expand Up @@ -221,7 +219,9 @@ def get_parser():
default=None,
help='batch system to submit jobs in parallel')
submission.add_argument('--queue-args', dest='queue_args', default=None,
help='Additional queue arguments')
help='Additional queue arguments passed as '
'single string of Argument=Value pairs space '
'separated.')
return parser


Expand All @@ -246,6 +246,13 @@ def process_args(args):
if not args.heuristic:
raise RuntimeError("No heuristic specified - add to arguments and rerun")

if args.queue:
lgr.info("Queuing %s conversion", args.queue)
iterarg, iterables = ("files", len(args.files)) if args.files else \
("subjects", len(args.subjs))
queue_conversion(args.queue, iterarg, iterables, args.queue_args)
sys.exit(0)

heuristic = load_heuristic(args.heuristic)

study_sessions = get_study_sessions(args.dicom_dir_template, args.files,
Expand Down Expand Up @@ -281,31 +288,6 @@ def process_args(args):
lgr.warning("Skipping unknown locator dataset")
continue

if args.queue:
# if seqinfo and not dicoms:
# # flatten them all and provide into batching, which again
# # would group them... heh
# dicoms = sum(seqinfo.values(), [])
# raise NotImplementedError(
# "we already grouped them so need to add a switch to avoid "
# "any grouping, so no outdir prefix doubled etc")

pyscript = op.abspath(inspect.getfile(inspect.currentframe()))

studyid = sid
if session:
studyid += "-%s" % session
if locator:
studyid += "-%s" % locator
# remove any separators
studyid = studyid.replace(op.sep, '_')

queue_conversion(pyscript,
args.queue,
studyid,
args.queue_args)
continue

anon_sid = anonymize_sid(sid, args.anon_cmd) if args.anon_cmd else None
if args.anon_cmd:
lgr.info('Anonymized {} to {}'.format(sid, anon_sid))
Expand Down
8 changes: 4 additions & 4 deletions heudiconv/dicoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging
from collections import OrderedDict
import tarfile
from heudiconv.external.pydicom import dcm

from .external.pydicom import dcm
from .utils import SeqInfo, load_json, set_readonly

lgr = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,10 +55,10 @@ def group_dicoms_into_seqinfos(files, file_filter, dcmfilter, grouping):
lgr.info('Filtering out {0} dicoms based on their filename'.format(
nfl_before-nfl_after))
for fidx, filename in enumerate(files):
from heudiconv.external.dcmstack import ds
import nibabel.nicom.dicomwrappers as dw
# TODO after getting a regression test check if the same behavior
# with stop_before_pixels=True
mw = ds.wrapper_from_data(dcm.read_file(filename, force=True))
mw = dw.wrapper_from_data(dcm.read_file(filename, force=True))

for sig in ('iop', 'ICE_Dims', 'SequenceName'):
try:
Expand Down Expand Up @@ -385,7 +385,7 @@ def embed_nifti(dcmfiles, niftifile, infofile, bids_info, min_meta):
import re

if not min_meta:
import dcmstack as ds
from heudiconv.external.dcmstack import ds
stack = ds.parse_and_stack(dcmfiles, force=True).values()
if len(stack) > 1:
raise ValueError('Found multiple series')
Expand Down
2 changes: 1 addition & 1 deletion heudiconv/external/dcmstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
try:
import dcmstack as ds
except ImportError as e:
from heudiconv import lgr
from .. import lgr
# looks different between py2 and 3 so we go for very rudimentary matching
e_str = str(e)
# there were changes from how
Expand Down
144 changes: 97 additions & 47 deletions heudiconv/queue.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,112 @@
import subprocess
import sys
import os

import logging

from .utils import which

lgr = logging.getLogger(__name__)

def queue_conversion(pyscript, queue, studyid, queue_args=None):
"""
Write out conversion arguments to file and submit to a job scheduler.
Parses `sys.argv` for heudiconv arguments.

Parameters
----------
pyscript: file
path to `heudiconv` script
queue: string
batch scheduler to use
studyid: string
identifier for conversion
queue_args: string (optional)
additional queue arguments for job submission

Returns
-------
proc: int
Queue submission exit code
"""

SUPPORTED_QUEUES = {'SLURM': 'sbatch'}
if queue not in SUPPORTED_QUEUES:
raise NotImplementedError("Queuing with %s is not supported", queue)

args = sys.argv[1:]
# search args for queue flag
for i, arg in enumerate(args):
if arg in ["-q", "--queue"]:
break
if i == len(args) - 1:
raise RuntimeError(
"Queue flag not found (must be provided as a command-line arg)"
)
# remove queue flag and value
del args[i:i+2]

# make arguments executable again
args.insert(0, pyscript)
pypath = sys.executable or "python"
args.insert(0, pypath)
def queue_conversion(queue, iterarg, iterables, queue_args=None):
"""
Write out conversion arguments to file and submit to a job scheduler.
Parses `sys.argv` for heudiconv arguments.

Parameters
----------
queue: string
Batch scheduler to use
iterarg: str
Multi-argument to index (`subjects` OR `files`)
iterables: int
Number of `iterarg` arguments
queue_args: string (optional)
Additional queue arguments for job submission

"""

SUPPORTED_QUEUES = {'SLURM': 'sbatch'}
if queue not in SUPPORTED_QUEUES:
raise NotImplementedError("Queuing with %s is not supported", queue)

for i in range(iterables):
args = clean_args(sys.argv[1:], iterarg, i)
# make arguments executable
heudiconv_exec = which("heudiconv") or "heudiconv"
args.insert(0, heudiconv_exec)
convertcmd = " ".join(args)

# will overwrite across subjects
queue_file = os.path.abspath('heudiconv-%s.sh' % queue)
with open(queue_file, 'wt') as fp:
fp.writelines(['#!/bin/bash\n', convertcmd, '\n'])
fp.write("#!/bin/bash\n")
if queue_args:
for qarg in queue_args.split():
fp.write("#SBATCH %s\n" % qarg)
fp.write(convertcmd + "\n")

cmd = [SUPPORTED_QUEUES[queue], queue_file]
if queue_args:
cmd.insert(1, queue_args)
proc = subprocess.call(cmd)
return proc
lgr.info("Submitted %d jobs", iterables)

def clean_args(hargs, iterarg, iteridx):
"""
Filters arguments for batch submission.

Parameters
----------
hargs: list
Command-line arguments
iterarg: str
Multi-argument to index (`subjects` OR `files`)
iteridx: int
`iterarg` index to submit

Returns
-------
cmdargs : list
Filtered arguments for batch submission

Example
--------
>>> from heudiconv.queue import clean_args
>>> cmd = ['heudiconv', '-d', '/some/{subject}/path',
... '-q', 'SLURM',
... '-s', 'sub-1', 'sub-2', 'sub-3', 'sub-4']
>>> clean_args(cmd, 'subjects', 0)
['heudiconv', '-d', '/some/{subject}/path', '-s', 'sub-1']
"""

if iterarg == "subjects":
iterarg = ['-s', '--subjects']
elif iterarg == "files":
iterarg = ['--files']
else:
raise ValueError("Cannot index %s" % iterarg)

# remove these or cause an infinite loop
queue_args = ['-q', '--queue', '--queue-args']

# control variables for multi-argument parsing
is_iterarg = False
itercount = 0

indicies = []
cmdargs = hargs[:]

for i, arg in enumerate(hargs):
if arg.startswith('-') and is_iterarg:
# moving on to another argument
is_iterarg = False
if is_iterarg:
if iteridx != itercount:
indicies.append(i)
itercount += 1
if arg in iterarg:
is_iterarg = True
if arg in queue_args:
indicies.extend([i, i+1])

for j in sorted(indicies, reverse=True):
del cmdargs[j]
return cmdargs
51 changes: 49 additions & 2 deletions heudiconv/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import subprocess

from heudiconv.cli.run import main as runner
from heudiconv.queue import clean_args, which
from .utils import TESTS_DATA_PATH
import pytest
from nipype.utils.filemanip import which

@pytest.mark.skipif(which("sbatch"), reason="skip a real slurm call")
@pytest.mark.parametrize(
Expand All @@ -23,7 +23,7 @@ def test_queue_no_slurm(tmpdir, invocation):
sys.argv = ['heudiconv'] + hargs

try:
with pytest.raises(OSError):
with pytest.raises(OSError): # SLURM should not be installed
runner(hargs)
# should have generated a slurm submission script
slurm_cmd_file = (tmpdir / 'heudiconv-SLURM.sh').strpath
Expand All @@ -44,3 +44,50 @@ def test_queue_no_slurm(tmpdir, invocation):
finally:
# revert before breaking something
sys.argv = _sys_args

def test_argument_filtering(tmpdir):
cmd_files = [
'heudiconv',
'--files',
'/fake/path/to/files',
'/another/fake/path',
'-f',
'convertall',
'-q',
'SLURM',
'--queue-args',
'--cpus-per-task=4 --contiguous --time=10'
]
filtered = [
'heudiconv',
'--files',
'/another/fake/path',
'-f',
'convertall',
]
assert clean_args(cmd_files, 'files', 1) == filtered

cmd_subjects = [
'heudiconv',
'-d',
'/some/{subject}/path',
'--queue',
'SLURM',
'--subjects',
'sub1',
'sub2',
'sub3',
'sub4',
'-f',
'convertall'
]
filtered = [
'heudiconv',
'-d',
'/some/{subject}/path',
'--subjects',
'sub3',
'-f',
'convertall'
]
assert clean_args(cmd_subjects, 'subjects', 2) == filtered
14 changes: 8 additions & 6 deletions heudiconv/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from pathlib import Path
from collections import namedtuple
from glob import glob
from subprocess import check_output

from nipype.utils.filemanip import which

import logging
lgr = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,18 +106,17 @@ def dec(obj):


def anonymize_sid(sid, anon_sid_cmd):
import sys
from subprocess import check_output


cmd = [anon_sid_cmd, sid]
shell_return = check_output(cmd)

### Handle subprocess returning a bytes literal string to a python3 interpreter
if all([sys.version_info[0] > 2, isinstance(shell_return, bytes), isinstance(sid, str)]):
if all([sys.version_info[0] > 2,
isinstance(shell_return, bytes),
isinstance(sid, str)]):
anon_sid = shell_return.decode()
else:
anon_sid = shell_return

return anon_sid.strip()


Expand Down