Skip to content

Commit

Permalink
FIX: Use load_resultfile when loading a results pickle
Browse files Browse the repository at this point in the history
Some sections of the code were using ``loadpkl`` which does not resolve
paths.

This PR also improves ``loadpkl`` for readability and reliability.
  • Loading branch information
oesteban committed Aug 2, 2019
1 parent 7262b24 commit fe5fa56
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 50 deletions.
8 changes: 5 additions & 3 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def interface(self):
@property
def result(self):
"""Get result from result file (do not hold it in memory)"""
return _load_resultfile(self.output_dir(), self.name)[0]
return _load_resultfile(
op.join(self.output_dir(), 'result_%s.pklz' % self.name))[0]

@property
def inputs(self):
Expand Down Expand Up @@ -517,7 +518,7 @@ def _get_inputs(self):
logger.debug('input: %s', key)
results_file = info[0]
logger.debug('results file: %s', results_file)
outputs = loadpkl(results_file).outputs
outputs = _load_resultfile(results_file)[0].outputs
if outputs is None:
raise RuntimeError("""\
Error populating the input "%s" of node "%s": the results file of the source node \
Expand Down Expand Up @@ -564,7 +565,8 @@ def _run_interface(self, execute=True, updatehash=False):

def _load_results(self):
cwd = self.output_dir()
result, aggregate, attribute_error = _load_resultfile(cwd, self.name)
result, aggregate, attribute_error = _load_resultfile(
op.join(cwd, 'result_%s.pklz' % self.name))
# try aggregating first
if aggregate:
logger.debug('aggregating results')
Expand Down
19 changes: 8 additions & 11 deletions nipype/pipeline/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from traceback import format_exception
from hashlib import sha1
import gzip

from functools import reduce

Expand All @@ -25,6 +24,7 @@

from ... import logging, config, LooseVersion
from ...utils.filemanip import (
Path,
relpath,
makedirs,
fname_presuffix,
Expand All @@ -34,6 +34,7 @@
FileNotFoundError,
save_json,
savepkl,
loadpkl,
write_rst_header,
write_rst_dict,
write_rst_list,
Expand Down Expand Up @@ -308,7 +309,7 @@ def save_resultfile(result, cwd, name):
setattr(result.outputs, k, v)


def load_resultfile(path, name):
def load_resultfile(results_file):
"""
Load InterfaceResult file from path
Expand All @@ -328,17 +329,14 @@ def load_resultfile(path, name):
rerun
"""
aggregate = True
resultsoutputfile = os.path.join(path, 'result_%s.pklz' % name)
results_file = Path(results_file)
path = '%s' % results_file.parents

result = None
attribute_error = False
if os.path.exists(resultsoutputfile):
pkl_file = gzip.open(resultsoutputfile, 'rb')
if results_file.exists():
try:
result = pickle.load(pkl_file)
except UnicodeDecodeError:
# Was this pickle created with Python 2.x?
pickle.load(pkl_file, fix_imports=True, encoding='utf-8')
logger.warning('Successfully loaded pkl in compatibility mode')
result = loadpkl(results_file)
except (traits.TraitError, AttributeError, ImportError,
EOFError) as err:
if isinstance(err, (AttributeError, ImportError)):
Expand All @@ -362,7 +360,6 @@ def load_resultfile(path, name):
logger.debug('conversion to full path results in '
'non existent file')
aggregate = False
pkl_file.close()
logger.debug('Aggregate: %s', aggregate)
return result, aggregate, attribute_error

Expand Down
7 changes: 3 additions & 4 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import numpy as np

from ... import logging
from ...utils.filemanip import loadpkl
from ...utils.misc import str2bool
from ..engine.utils import topological_sort
from ..engine.utils import topological_sort, load_resultfile
from ..engine import MapNode
from .tools import report_crash, report_nodes_not_run, create_pyscript

Expand Down Expand Up @@ -504,7 +503,7 @@ def _get_result(self, taskid):
result_data['traceback'] = '\n'.join(format_exception(*sys.exc_info()))
else:
results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
result_data = loadpkl(results_file)
result_data = load_resultfile(results_file)
result_out = dict(result=None, traceback=None)
if isinstance(result_data, dict):
result_out['result'] = result_data['result']
Expand Down Expand Up @@ -602,7 +601,7 @@ def _get_result(self, taskid):
glob(os.path.join(node_dir, 'result_*.pklz')).pop()

results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
result_data = loadpkl(results_file)
result_data = load_resultfile(results_file)
result_out = dict(result=None, traceback=None)

if isinstance(result_data, dict):
Expand Down
62 changes: 30 additions & 32 deletions nipype/utils/filemanip.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,52 +676,50 @@ def loadcrash(infile, *args):


def loadpkl(infile, versioning=False):
"""Load a zipped or plain cPickled file
"""
"""Load a zipped or plain cPickled file."""
infile = Path(infile)
fmlogger.debug('Loading pkl: %s', infile)
if infile.endswith('pklz'):
pkl_file = gzip.open(infile, 'rb')
else:
pkl_file = open(infile, 'rb')
pklopen = gzip.open if infile.suffix == '.pklz' else open
pkl_metadata = None

if versioning:
pkl_metadata = {}
with indirectory(str(infile.parent)):
pkl_file = pklopen(infile.name, 'rb')

# Look if pkl file contains version file
try:
try: # Look if pkl file contains version file
pkl_metadata_line = pkl_file.readline()
pkl_metadata = json.loads(pkl_metadata_line)
except:
except UnicodeDecodeError:
pass
finally:
# Could not get version info
pkl_file.seek(0)

try:
try:
unpkl = pickle.load(pkl_file)
except UnicodeDecodeError:
# Was this pickle created with Python 2.x?
unpkl = pickle.load(pkl_file, fix_imports=True, encoding='utf-8')

return unpkl

# Unpickling problems
except Exception as e:
if not versioning:
fmlogger.info('Successfully loaded pkl in compatibility mode.')
# Unpickling problems
except Exception as e:
if not versioning:
raise e

if pkl_metadata and 'version' in pkl_metadata:
from nipype import __version__ as version
if pkl_metadata['version'] != version:
fmlogger.error("""\
Attempted to open a results file generated by Nipype version %s, \
with an incompatible Nipype version (%s)""", pkl_metadata['version'], version)
raise e
fmlogger.error("""\
No metadata was found in the pkl file. Make sure you are currently using \
the same Nipype version from the generated pkl.""")
raise e

from nipype import __version__ as version

if 'version' in pkl_metadata:
if pkl_metadata['version'] != version:
fmlogger.error('Your Nipype version is: %s',
version)
fmlogger.error('Nipype version of the pkl is: %s',
pkl_metadata['version'])
else:
fmlogger.error('No metadata was found in the pkl file.')
fmlogger.error('Make sure that you are using the same Nipype'
'version from the generated pkl.')

raise e
return unpkl
finally:
pkl_file.close()


def crash2txt(filename, record):
Expand Down

0 comments on commit fe5fa56

Please sign in to comment.