diff --git a/src/quacc/settings.py b/src/quacc/settings.py index db4b778006..83f766ff3b 100644 --- a/src/quacc/settings.py +++ b/src/quacc/settings.py @@ -2,10 +2,13 @@ from __future__ import annotations +import inspect import os from contextlib import contextmanager +from datetime import datetime, timezone from functools import wraps from pathlib import Path +from random import randint from shutil import which from typing import TYPE_CHECKING, Literal, Optional, Union @@ -97,6 +100,14 @@ class QuaccSettings(BaseSettings): """ ), ) + NESTED_RESULTS_DIR: bool = Field( + True, + description=( + """ + Whether to automatically nest the results directories by the calling flow, subflow, etc. + """ + ), + ) GZIP_FILES: bool = Field( True, description="Whether generated files should be gzip'd." ) @@ -594,5 +605,40 @@ def wrapper(*args, **kwargs): return original_func(*args, **kwargs) wrapper._changed = True + wrapper._changes = changes wrapper._original_func = original_func return wrapper + + +def nest_results_dir_wrap(func: Callable) -> Callable: + """ + Wraps a function with the change_settings context manager using a nested RESULTS_DIR + + Parameters + ---------- + func + The function to wrap. + + Returns + ------- + Callable + The wrapped function. + """ + from quacc import get_settings + + changes = getattr(func, "_changes", {}) + + # Get the settings from the calling function's context + results_parent_dir = get_settings().RESULTS_DIR + time_now = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S-%f") + nested_results_dir = results_parent_dir / Path( + f"{inspect.getmodule(func).__name__}.{func.__name__}-{time_now}-{randint(10000, 99999)}" + ) + + # If someone explcitly set RESULTS_DIR in change_settings already + # they probably know what they're doing and really want a specific + # folder! + if "RESULTS_DIR" not in changes: + changes["RESULTS_DIR"] = nested_results_dir + + return change_settings_wrap(func, changes) diff --git a/src/quacc/wflow_tools/decorators.py b/src/quacc/wflow_tools/decorators.py index 8fb8b4a844..5b0e6cfe8c 100644 --- a/src/quacc/wflow_tools/decorators.py +++ b/src/quacc/wflow_tools/decorators.py @@ -3,14 +3,25 @@ from __future__ import annotations from functools import partial, wraps -from typing import Any, Callable +from typing import TYPE_CHECKING, Any, Callable -from quacc.settings import change_settings_wrap +from quacc.settings import change_settings_wrap, nest_results_dir_wrap + +if TYPE_CHECKING: + from prefect import Flow as PrefectFlow Job = Callable[..., Any] Flow = Callable[..., Any] Subflow = Callable[..., Any] +if TYPE_CHECKING: + from importlib.util import find_spec + + from quacc.settings import QuaccSettings + + if bool(find_spec("prefect")): + from prefect import Task + def job(_func: Callable[..., Any] | None = None, **kwargs) -> Job: """ @@ -173,18 +184,7 @@ def wrapper(*f_args, **f_kwargs): return task(_func, namespace=_func.__module__, **kwargs) elif settings.WORKFLOW_ENGINE == "prefect": - from prefect import task - - if settings.PREFECT_AUTO_SUBMIT: - - @wraps(_func) - def wrapper(*f_args, **f_kwargs): - decorated = task(_func, **kwargs) - return decorated.submit(*f_args, **f_kwargs) - - return wrapper - else: - return task(_func, **kwargs) + return _decorate_prefect_job(_func, kwargs, settings) else: return _func @@ -347,9 +347,8 @@ def workflow(a, b, c): return task(_func, namespace=_func.__module__, **kwargs) elif settings.WORKFLOW_ENGINE == "prefect": - from prefect import flow as prefect_flow + return _decorate_prefect_flow_subflow(_func, kwargs, settings) - return prefect_flow(_func, validate_parameters=False, **kwargs) else: return _func @@ -578,9 +577,8 @@ def wrapper(*f_args, **f_kwargs): return join_app(wrapped_fn, **kwargs) elif settings.WORKFLOW_ENGINE == "prefect": - from prefect import flow as prefect_flow + return _decorate_prefect_flow_subflow(_func, kwargs, settings) - return prefect_flow(_func, validate_parameters=False, **kwargs) elif settings.WORKFLOW_ENGINE == "redun": from redun import task @@ -589,6 +587,48 @@ def wrapper(*f_args, **f_kwargs): return _func +def _decorate_prefect_job( + _func: Callable, kwargs: dict[str, Any], settings: QuaccSettings +) -> Task: + from prefect import task + + if settings.PREFECT_AUTO_SUBMIT or settings.NESTED_RESULTS_DIR: + + @wraps(_func) + def wrapper(*f_args, **f_kwargs): + if settings.NESTED_RESULTS_DIR: + decorated = task(nest_results_dir_wrap(_func), **kwargs) + else: + decorated = task(_func, **kwargs) + if settings.PREFECT_AUTO_SUBMIT: + return decorated.submit(*f_args, **f_kwargs) + else: + return decorated(*f_args, **f_kwargs) + + return wrapper + else: + return task(_func, **kwargs) + + +def _decorate_prefect_flow_subflow( + _func: Callable, kwargs: dict[str, Any], settings: QuaccSettings +) -> PrefectFlow: + from prefect import flow as prefect_flow + + if settings.NESTED_RESULTS_DIR: + + @wraps(_func) + def wrapper(*f_args, **f_kwargs): + decorated = prefect_flow( + nest_results_dir_wrap(_func), validate_parameters=False, **kwargs + ) + return decorated(*f_args, **f_kwargs) + + return wrapper + else: + return prefect_flow(_func, validate_parameters=False, **kwargs) + + def _get_parsl_wrapped_func( func: Callable, decorator_kwargs: dict[str, Any] ) -> Callable: diff --git a/tests/prefect/test_customizers.py b/tests/prefect/test_customizers.py index b80d34728f..b932c95035 100644 --- a/tests/prefect/test_customizers.py +++ b/tests/prefect/test_customizers.py @@ -98,3 +98,36 @@ def my_flow(): my_flow().result() assert not Path(tmp_dir1 / "job.txt").exists() assert Path(tmp_dir2 / "job.txt").exists() + + +def test_nested_output_directory(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + + @job + def write_file_job(name="job.txt"): + results_file_path = Path(get_settings().RESULTS_DIR, name) + with open(results_file_path, "w") as f: + f.write("test job file") + + return results_file_path + + @flow + def write_file_flow(name="flow.txt"): + job_results_file_path = write_file_job() + + flow_results_file_path = Path(get_settings().RESULTS_DIR, name) + with open(flow_results_file_path, "w") as f: + f.write("test flow file") + + return job_results_file_path, flow_results_file_path + + # Test with redecorating a job in a flow + job_results_file_path, flow_results_file_path = write_file_flow() + job_results_file_path = job_results_file_path.result() + assert Path(job_results_file_path).exists() + assert Path(flow_results_file_path).exists() + + # Check the job output is in a subfolder of the flow folder + assert ( + Path(job_results_file_path).parent.parent == Path(flow_results_file_path).parent + )