Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Use load_resultfile when loading a results pickle #2985

Merged
merged 2 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ def interface(self):
@property
def result(self):
"""Get result from result file (do not hold it in memory)"""
return _load_resultfile(self.output_dir(), self.name)[0]
return _load_resultfile(
op.join(self.output_dir(), 'result_%s.pklz' % self.name))[0]

@property
def inputs(self):
Expand Down Expand Up @@ -517,7 +518,7 @@ def _get_inputs(self):
logger.debug('input: %s', key)
results_file = info[0]
logger.debug('results file: %s', results_file)
outputs = loadpkl(results_file).outputs
outputs = _load_resultfile(results_file)[0].outputs
if outputs is None:
raise RuntimeError("""\
Error populating the input "%s" of node "%s": the results file of the source node \
Expand Down Expand Up @@ -564,7 +565,8 @@ def _run_interface(self, execute=True, updatehash=False):

def _load_results(self):
cwd = self.output_dir()
result, aggregate, attribute_error = _load_resultfile(cwd, self.name)
result, aggregate, attribute_error = _load_resultfile(
op.join(cwd, 'result_%s.pklz' % self.name))
# try aggregating first
if aggregate:
logger.debug('aggregating results')
Expand Down
18 changes: 6 additions & 12 deletions nipype/pipeline/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from traceback import format_exception
from hashlib import sha1
import gzip

from functools import reduce

Expand All @@ -25,6 +24,7 @@

from ... import logging, config, LooseVersion
from ...utils.filemanip import (
Path,
relpath,
makedirs,
fname_presuffix,
Expand All @@ -34,6 +34,7 @@
FileNotFoundError,
save_json,
savepkl,
loadpkl,
write_rst_header,
write_rst_dict,
write_rst_list,
Expand Down Expand Up @@ -233,7 +234,7 @@ def save_resultfile(result, cwd, name):
logger.debug('saved results in %s', resultsfile)


def load_resultfile(path, name):
def load_resultfile(results_file):
"""
Load InterfaceResult file from path

Expand All @@ -253,17 +254,12 @@ def load_resultfile(path, name):
rerun
"""
aggregate = True
resultsoutputfile = os.path.join(path, 'result_%s.pklz' % name)
results_file = Path(results_file)
result = None
attribute_error = False
if os.path.exists(resultsoutputfile):
pkl_file = gzip.open(resultsoutputfile, 'rb')
if results_file.exists():
try:
result = pickle.load(pkl_file)
except UnicodeDecodeError:
# Was this pickle created with Python 2.x?
pickle.load(pkl_file, fix_imports=True, encoding='utf-8')
logger.warning('Successfully loaded pkl in compatibility mode')
result = loadpkl(results_file)
except (traits.TraitError, AttributeError, ImportError,
EOFError) as err:
if isinstance(err, (AttributeError, ImportError)):
Expand All @@ -275,8 +271,6 @@ def load_resultfile(path, name):
'some file does not exist. hence trait cannot be set')
else:
aggregate = False
finally:
pkl_file.close()

logger.debug('Aggregate: %s', aggregate)
return result, aggregate, attribute_error
Expand Down
7 changes: 3 additions & 4 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import numpy as np

from ... import logging
from ...utils.filemanip import loadpkl
from ...utils.misc import str2bool
from ..engine.utils import topological_sort
from ..engine.utils import topological_sort, load_resultfile
from ..engine import MapNode
from .tools import report_crash, report_nodes_not_run, create_pyscript

Expand Down Expand Up @@ -504,7 +503,7 @@ def _get_result(self, taskid):
result_data['traceback'] = '\n'.join(format_exception(*sys.exc_info()))
else:
results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
result_data = loadpkl(results_file)
result_data = load_resultfile(results_file)
result_out = dict(result=None, traceback=None)
if isinstance(result_data, dict):
result_out['result'] = result_data['result']
Expand Down Expand Up @@ -602,7 +601,7 @@ def _get_result(self, taskid):
glob(os.path.join(node_dir, 'result_*.pklz')).pop()

results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
result_data = loadpkl(results_file)
result_data = load_resultfile(results_file)
result_out = dict(result=None, traceback=None)

if isinstance(result_data, dict):
Expand Down
62 changes: 30 additions & 32 deletions nipype/utils/filemanip.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,52 +676,50 @@ def loadcrash(infile, *args):


def loadpkl(infile, versioning=False):
"""Load a zipped or plain cPickled file
"""
"""Load a zipped or plain cPickled file."""
infile = Path(infile)
fmlogger.debug('Loading pkl: %s', infile)
if infile.endswith('pklz'):
pkl_file = gzip.open(infile, 'rb')
else:
pkl_file = open(infile, 'rb')
pklopen = gzip.open if infile.suffix == '.pklz' else open
pkl_metadata = None

if versioning:
pkl_metadata = {}
with indirectory(str(infile.parent)):
pkl_file = pklopen(infile.name, 'rb')

# Look if pkl file contains version file
try:
try: # Look if pkl file contains version file
pkl_metadata_line = pkl_file.readline()
pkl_metadata = json.loads(pkl_metadata_line)
except:
except (UnicodeDecodeError, json.JSONDecodeError):
pass
finally:
# Could not get version info
pkl_file.seek(0)

try:
try:
unpkl = pickle.load(pkl_file)
except UnicodeDecodeError:
# Was this pickle created with Python 2.x?
unpkl = pickle.load(pkl_file, fix_imports=True, encoding='utf-8')

return unpkl

# Unpickling problems
except Exception as e:
if not versioning:
fmlogger.info('Successfully loaded pkl in compatibility mode.')
# Unpickling problems
except Exception as e:
if not versioning:
raise e

if pkl_metadata and 'version' in pkl_metadata:
from nipype import __version__ as version
if pkl_metadata['version'] != version:
fmlogger.error("""\
Attempted to open a results file generated by Nipype version %s, \
with an incompatible Nipype version (%s)""", pkl_metadata['version'], version)
raise e
fmlogger.error("""\
No metadata was found in the pkl file. Make sure you are currently using \
the same Nipype version from the generated pkl.""")
raise e

from nipype import __version__ as version

if 'version' in pkl_metadata:
if pkl_metadata['version'] != version:
fmlogger.error('Your Nipype version is: %s',
version)
fmlogger.error('Nipype version of the pkl is: %s',
pkl_metadata['version'])
else:
fmlogger.error('No metadata was found in the pkl file.')
fmlogger.error('Make sure that you are using the same Nipype'
'version from the generated pkl.')

raise e
return unpkl
finally:
pkl_file.close()


def crash2txt(filename, record):
Expand Down