Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Resolving/rebasing paths from/to results files #2971

Merged
merged 9 commits into from
Aug 7, 2019
3 changes: 1 addition & 2 deletions nipype/caching/tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ class SideEffectInterface(EngineTestInterface):
def _run_interface(self, runtime):
global nb_runs
nb_runs += 1
runtime.returncode = 0
return runtime
return super(SideEffectInterface, self)._run_interface(runtime)


def test_caching(tmpdir):
Expand Down
2 changes: 1 addition & 1 deletion nipype/interfaces/base/traits_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ def _recurse_on_path_traits(func, thistrait, value, cwd):
elif thistrait.is_trait_type(traits.List):
innertrait, = thistrait.inner_traits
if not isinstance(value, (list, tuple)):
value = [value]
return _recurse_on_path_traits(func, innertrait, value, cwd)

value = [_recurse_on_path_traits(func, innertrait, v, cwd)
for v in value]
Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ def _run_interface(self, execute=True, updatehash=False):
stop_first=str2bool(
self.config['execution']['stop_on_first_crash'])))
# And store results
_save_resultfile(result, cwd, self.name)
_save_resultfile(result, cwd, self.name, rebase=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be self.config.getboolean('execution', 'use_relative_paths')?

Suggested change
_save_resultfile(result, cwd, self.name, rebase=False)
_save_resultfile(result, cwd, self.name,
rebase=self.config.getboolean('execution', 'use_relative_paths'))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the discussion point 2 I wanted to bring up atop. On the one hand, should we expose the inner workings of the engine - after all, adding that here does not change anything to the user (just that they won't be able to move the work directory anymore).

On the other hand, for this to be completely consistent across the node implementation, we need to also rebase/resolve the inputs pickle (actually I'll open an issue because this should be addressed).

I'm under the impression that use_relative_paths could be useful for the user at the interface level, forcing interfaces to return relative paths if desired.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satra can you share your views as regards use_relative_paths

@effigies WDYT about not exposing the inner workings (paths) of the results file to the users?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My overall perspective is "Let's not change the API if we don't absolutely have to to fix the bug." Which includes config file options.

I may not be understanding your position though, so we might be talking past each other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, from that standpoint:

use_relative_paths
Should the paths stored in results (and used to look for inputs) be relative or absolute. Relative paths allow moving the whole working directory around but may cause problems with symlinks. (possible values: true and false; default value: false)

The option is clearly defined specifically for the results file. Under that perspective, yes, any rebasing should be done only if use_relative_paths is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But actually not at the point you suggested, because that one happens at MapNode aggregation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, okay. Sorry, my attention is split a lot of ways right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, however, we may want to add parameterized tests with use_relative_paths on and off.

# remove any node directories no longer required
dirs2remove = []
for path in glob(op.join(cwd, 'mapflow', '*')):
Expand Down
8 changes: 2 additions & 6 deletions nipype/pipeline/engine/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,15 @@ class OutputSpec(nib.TraitedSpec):
output1 = nib.traits.List(nib.traits.Int, desc='outputs')


class EngineTestInterface(nib.BaseInterface):
class EngineTestInterface(nib.SimpleInterface):
input_spec = InputSpec
output_spec = OutputSpec

def _run_interface(self, runtime):
runtime.returncode = 0
self._results['output1'] = [1, self.inputs.input1]
return runtime

def _list_outputs(self):
outputs = self._outputs().get()
outputs['output1'] = [1, self.inputs.input1]
return outputs


@pytest.mark.parametrize(
'name', ['valid1', 'valid_node', 'valid-node', 'ValidNode0'])
Expand Down
59 changes: 59 additions & 0 deletions nipype/pipeline/engine/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,62 @@ def test_mapnode_crash3(tmpdir):
wf.config["execution"]["crashdump_dir"] = os.getcwd()
with pytest.raises(RuntimeError):
wf.run(plugin='Linear')

class StrPathConfuserInputSpec(nib.TraitedSpec):
in_str = nib.traits.String()


class StrPathConfuserOutputSpec(nib.TraitedSpec):
out_tuple = nib.traits.Tuple(nib.File, nib.traits.String)
out_dict_path = nib.traits.Dict(nib.traits.String, nib.File(exists=True))
out_dict_str = nib.traits.DictStrStr()
out_list = nib.traits.List(nib.traits.String)
out_str = nib.traits.String()
out_path = nib.File(exists=True)


class StrPathConfuser(nib.SimpleInterface):
input_spec = StrPathConfuserInputSpec
output_spec = StrPathConfuserOutputSpec

def _run_interface(self, runtime):
out_path = os.path.abspath(os.path.basename(self.inputs.in_str) + '_path')
open(out_path, 'w').close()
self._results['out_str'] = self.inputs.in_str
self._results['out_path'] = out_path
self._results['out_tuple'] = (out_path, self.inputs.in_str)
self._results['out_dict_path'] = {self.inputs.in_str: out_path}
self._results['out_dict_str'] = {self.inputs.in_str: self.inputs.in_str}
self._results['out_list'] = [self.inputs.in_str] * 2
return runtime


def test_modify_paths_bug(tmpdir):
"""
There was a bug in which, if the current working directory contained a file with the name
of an output String, the string would get transformed into a path, and generally wreak havoc.
This attempts to replicate that condition, using an object with strings and paths in various
trait configurations, to ensure that the guards added resolve the issue.
Please see https://github.com/nipy/nipype/issues/2944 for more details.
"""
tmpdir.chdir()

spc = pe.Node(StrPathConfuser(in_str='2'), name='spc')

open('2', 'w').close()

outputs = spc.run().outputs

# Basic check that string was not manipulated
out_str = outputs.out_str
assert out_str == '2'

# Check path exists and is absolute
out_path = outputs.out_path
assert os.path.isabs(out_path)

# Assert data structures pass through correctly
assert outputs.out_tuple == (out_path, out_str)
assert outputs.out_dict_path == {out_str: out_path}
assert outputs.out_dict_str == {out_str: out_str}
assert outputs.out_list == [out_str] * 2
94 changes: 71 additions & 23 deletions nipype/pipeline/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
from ... import logging, config, LooseVersion
from ...utils.filemanip import (
Path,
indirectory,
relpath,
makedirs,
fname_presuffix,
to_str,
ensure_list,
get_related_files,
FileNotFoundError,
save_json,
savepkl,
loadpkl,
Expand All @@ -41,8 +41,10 @@
)
from ...utils.misc import str2bool
from ...utils.functions import create_function_from_source
from ...interfaces.base import (Bunch, CommandLine, isdefined, Undefined,
InterfaceResult, traits)
from ...interfaces.base.traits_extension import (
rebase_path_traits, resolve_path_traits, OutputMultiPath, isdefined, Undefined, traits)
from ...interfaces.base.support import Bunch, InterfaceResult
from ...interfaces.base import CommandLine
from ...interfaces.utility import IdentityInterface
from ...utils.provenance import ProvStore, pm, nipype_ns, get_id

Expand Down Expand Up @@ -227,52 +229,98 @@ def write_report(node, report_type=None, is_mapnode=False):
return


def save_resultfile(result, cwd, name):
"""Save a result pklz file to ``cwd``"""
def save_resultfile(result, cwd, name, rebase=True):
"""Save a result pklz file to ``cwd``."""
cwd = os.path.abspath(cwd)
resultsfile = os.path.join(cwd, 'result_%s.pklz' % name)
savepkl(resultsfile, result)
logger.debug('saved results in %s', resultsfile)
logger.debug("Saving results file: '%s'", resultsfile)

if result.outputs is None:
logger.warn('Storing result file without outputs')
savepkl(resultsfile, result)
return
try:
output_names = result.outputs.copyable_trait_names()
except AttributeError:
logger.debug('Storing non-traited results, skipping rebase of paths')
savepkl(resultsfile, result)
return

def load_resultfile(results_file):
backup_traits = {}
try:
with indirectory(cwd):
# All the magic to fix #2944 resides here:
for key in output_names:
old = getattr(result.outputs, key)
if isdefined(old):
if result.outputs.trait(key).is_trait_type(OutputMultiPath):
old = result.outputs.trait(key).handler.get_value(
result.outputs, key)
backup_traits[key] = old
val = rebase_path_traits(result.outputs.trait(key), old, cwd)
setattr(result.outputs, key, val)
savepkl(resultsfile, result)
finally:
# Restore resolved paths from the outputs dict no matter what
for key, val in list(backup_traits.items()):
setattr(result.outputs, key, val)


def load_resultfile(results_file, resolve=True):
"""
Load InterfaceResult file from path
Load InterfaceResult file from path.

Parameter
---------

path : base_dir of node
name : name of node

Returns
-------

result : InterfaceResult structure
aggregate : boolean indicating whether node should aggregate_outputs
attribute error : boolean indicating whether there was some mismatch in
versions of traits used to store result and hence node needs to
rerun

"""
aggregate = True
results_file = Path(results_file)
aggregate = True
result = None
attribute_error = False
if results_file.exists():

if not results_file.exists():
return result, aggregate, attribute_error

with indirectory(str(results_file.parent)):
try:
result = loadpkl(results_file)
except (traits.TraitError, AttributeError, ImportError,
EOFError) as err:
if isinstance(err, (AttributeError, ImportError)):
attribute_error = True
logger.debug('attribute error: %s probably using '
'different trait pickled file', str(err))
else:
logger.debug(
'some file does not exist. hence trait cannot be set')
except (traits.TraitError, EOFError):
logger.debug(
'some file does not exist. hence trait cannot be set')
except (AttributeError, ImportError) as err:
attribute_error = True
logger.debug('attribute error: %s probably using '
'different trait pickled file', str(err))
else:
aggregate = False

logger.debug('Aggregate: %s', aggregate)
if resolve and result.outputs:
try:
outputs = result.outputs.get()
except TypeError: # This is a Bunch
return result, aggregate, attribute_error

logger.debug('Resolving paths in outputs loaded from results file.')
for trait_name, old in list(outputs.items()):
if isdefined(old):
if result.outputs.trait(trait_name).is_trait_type(OutputMultiPath):
old = result.outputs.trait(trait_name).handler.get_value(
result.outputs, trait_name)
value = resolve_path_traits(result.outputs.trait(trait_name), old,
results_file.parent)
setattr(result.outputs, trait_name, value)

return result, aggregate, attribute_error


Expand Down