diff --git a/docs/changelog.md b/docs/changelog.md index feba103..35fec73 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -1,5 +1,9 @@ # Changelog +## [0.14.2] -- 2024-05-07 +### Changed +- Addresses [#218](https://github.com/databio/pypiper/issues/218) + ## [0.14.1] -- 2024-04-19 ### Changed - remove pipestat_project_name from PipelineManager parameters diff --git a/pypiper/_version.py b/pypiper/_version.py index f075dd3..745162e 100644 --- a/pypiper/_version.py +++ b/pypiper/_version.py @@ -1 +1 @@ -__version__ = "0.14.1" +__version__ = "0.14.2" diff --git a/pypiper/manager.py b/pypiper/manager.py index 965945b..657dd39 100644 --- a/pypiper/manager.py +++ b/pypiper/manager.py @@ -1290,8 +1290,6 @@ def proc_wrapup(i): sleeptime = min((sleeptime + 0.25) * 3, 60 / len(processes)) # All jobs are done, print a final closing and job info - stop_time = time.time() - proc_message = "Command completed. {info}" info = ( "Elapsed time: " + str(datetime.timedelta(seconds=self.time_elapsed(start_time))) @@ -1308,7 +1306,7 @@ def proc_wrapup(i): info += "\n" # finish out the self.info("") - self.info(proc_message.format(info=info)) + self.info("Command completed. {info}".format(info=info)) for rc in returncodes: if rc != 0: diff --git a/pypiper/pipeline.py b/pypiper/pipeline.py index 95d31f2..a02c9ad 100644 --- a/pypiper/pipeline.py +++ b/pypiper/pipeline.py @@ -332,9 +332,13 @@ def run(self, start_point=None, stop_before=None, stop_after=None): print(f"Running stage: {getattr(stage, 'name', str(stage))}") - stage.run() + try: + stage.run() + except Exception as e: + self.manager._triage_error(e, nofail=stage.nofail) + else: + self.checkpoint(stage) self.executed.append(stage) - self.checkpoint(stage) # Add any unused stages to the collection of skips. self.skipped.extend(self._stages[stop_index:]) diff --git a/pypiper/stage.py b/pypiper/stage.py index 6f1d551..929becf 100644 --- a/pypiper/stage.py +++ b/pypiper/stage.py @@ -17,7 +17,16 @@ class Stage(object): collection of commands that is checkpointed. """ - def __init__(self, func, f_args=None, f_kwargs=None, name=None, checkpoint=True): + def __init__( + self, + func, + f_args=None, + f_kwargs=None, + name=None, + checkpoint=True, + *, + nofail=False + ): """ A function, perhaps with arguments, defines the stage. @@ -26,6 +35,8 @@ def __init__(self, func, f_args=None, f_kwargs=None, name=None, checkpoint=True) :param dict f_kwargs: Keyword arguments for func :param str name: name for the phase/stage :param callable func: Object that defines how the stage will execute. + :param bool nofail: Allow a failure of this stage to not fail the pipeline + in which it's running """ if isinstance(func, Stage): raise TypeError("Cannot create Stage from Stage") @@ -35,6 +46,7 @@ def __init__(self, func, f_args=None, f_kwargs=None, name=None, checkpoint=True) self.f_kwargs = f_kwargs or dict() self.name = name or func.__name__ self.checkpoint = checkpoint + self.nofail = nofail @property def checkpoint_name(self): diff --git a/requirements/requirements-dev-extra.txt b/requirements/requirements-dev-extra.txt new file mode 100644 index 0000000..7e66a17 --- /dev/null +++ b/requirements/requirements-dev-extra.txt @@ -0,0 +1 @@ +black