Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Organize outputs directory as nested folders #2296

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8afb69e
first commit
zulissimeta Jun 21, 2024
93d02a7
remove commented lines
zulissimeta Jun 21, 2024
58666be
pre-commit auto-fixes
pre-commit-ci[bot] Jun 21, 2024
1bed53e
clean up nested dir function
zulissimeta Jun 21, 2024
d54d31f
Merge branch 'organize_outputs' of github.com:zulissimeta/quacc into …
zulissimeta Jun 21, 2024
ae8c71a
clean up decorators
zulissimeta Jun 21, 2024
cbb07db
use datetime and random number instead of uuid
zulissimeta Jun 21, 2024
01ce6b5
datetime, dont overwrite changes, and add a test
zulissimeta Jun 22, 2024
c6aaa69
pre-commit auto-fixes
pre-commit-ci[bot] Jun 22, 2024
ea72655
typo in code and settings
zulissimeta Jun 22, 2024
8d02609
Merge branch 'organize_outputs' of github.com:zulissimeta/quacc into …
zulissimeta Jun 22, 2024
58ac899
refactor prefect decoration code
zulissimeta Jun 22, 2024
63e0224
pre-commit auto-fixes
pre-commit-ci[bot] Jun 22, 2024
d292112
Merge branch 'main' into organize_outputs
Andrew-S-Rosen Jun 24, 2024
28224da
Minor refactoring
Andrew-S-Rosen Jun 27, 2024
094ce69
pre-commit auto-fixes
pre-commit-ci[bot] Jun 27, 2024
4bfbc74
Fix type hitn
Andrew-S-Rosen Jun 27, 2024
7f7a690
Fix imports
Andrew-S-Rosen Jun 27, 2024
dad242f
Merge branch 'organize_outputs' of github.com:zulissimeta/quacc into …
Andrew-S-Rosen Jun 27, 2024
2ff299c
Merge branch 'main' into organize_outputs
Andrew-S-Rosen Jun 27, 2024
75d249c
Fix spacing
Andrew-S-Rosen Jun 27, 2024
9802d89
Merge branch 'main' into organize_outputs
Andrew-S-Rosen Jul 1, 2024
04977cc
Merge branch 'main' into organize_outputs
Andrew-S-Rosen Jul 27, 2024
b67ddb8
pre-commit auto-fixes
pre-commit-ci[bot] Jul 27, 2024
42f0c14
Merge branch 'main' into organize_outputs
zulissimeta Aug 7, 2024
2215b1c
Merge branch 'main' into organize_outputs
Andrew-S-Rosen Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/quacc/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from __future__ import annotations

import inspect
import os
from contextlib import contextmanager
from datetime import datetime, timezone
from functools import wraps
from pathlib import Path
from random import randint
from shutil import which
from typing import TYPE_CHECKING, Literal, Optional, Union

Expand Down Expand Up @@ -97,6 +100,14 @@ class QuaccSettings(BaseSettings):
"""
),
)
NESTED_RESULTS_DIR: bool = Field(
True,
description=(
"""
Whether to automatically nest the results directories by the calling flow, subflow, etc.
"""
),
)
GZIP_FILES: bool = Field(
True, description="Whether generated files should be gzip'd."
)
Expand Down Expand Up @@ -594,5 +605,40 @@ def wrapper(*args, **kwargs):
return original_func(*args, **kwargs)

wrapper._changed = True
wrapper._changes = changes
wrapper._original_func = original_func
return wrapper


def nest_results_dir_wrap(func: Callable) -> Callable:
"""
Wraps a function with the change_settings context manager using a nested RESULTS_DIR

Parameters
----------
func
The function to wrap.

Returns
-------
Callable
The wrapped function.
"""
from quacc import get_settings

changes = getattr(func, "_changes", {})

# Get the settings from the calling function's context
results_parent_dir = get_settings().RESULTS_DIR
time_now = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S-%f")
nested_results_dir = results_parent_dir / Path(
f"{inspect.getmodule(func).__name__}.{func.__name__}-{time_now}-{randint(10000, 99999)}"
)

# If someone explcitly set RESULTS_DIR in change_settings already
# they probably know what they're doing and really want a specific
# folder!
if "RESULTS_DIR" not in changes:
changes["RESULTS_DIR"] = nested_results_dir

return change_settings_wrap(func, changes)
76 changes: 58 additions & 18 deletions src/quacc/wflow_tools/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@
from __future__ import annotations

from functools import partial, wraps
from typing import Any, Callable
from typing import TYPE_CHECKING, Any, Callable

from quacc.settings import change_settings_wrap
from quacc.settings import change_settings_wrap, nest_results_dir_wrap

if TYPE_CHECKING:
from prefect import Flow as PrefectFlow

Job = Callable[..., Any]
Flow = Callable[..., Any]
Subflow = Callable[..., Any]

if TYPE_CHECKING:
from importlib.util import find_spec

from quacc.settings import QuaccSettings

if bool(find_spec("prefect")):
from prefect import Task


def job(_func: Callable[..., Any] | None = None, **kwargs) -> Job:
"""
Expand Down Expand Up @@ -173,18 +184,7 @@

return task(_func, namespace=_func.__module__, **kwargs)
elif settings.WORKFLOW_ENGINE == "prefect":
from prefect import task

if settings.PREFECT_AUTO_SUBMIT:

@wraps(_func)
def wrapper(*f_args, **f_kwargs):
decorated = task(_func, **kwargs)
return decorated.submit(*f_args, **f_kwargs)

return wrapper
else:
return task(_func, **kwargs)
return _decorate_prefect_job(_func, kwargs, settings)
else:
return _func

Expand Down Expand Up @@ -347,9 +347,8 @@

return task(_func, namespace=_func.__module__, **kwargs)
elif settings.WORKFLOW_ENGINE == "prefect":
from prefect import flow as prefect_flow
return _decorate_prefect_flow_subflow(_func, kwargs, settings)

return prefect_flow(_func, validate_parameters=False, **kwargs)
else:
return _func

Expand Down Expand Up @@ -578,9 +577,8 @@

return join_app(wrapped_fn, **kwargs)
elif settings.WORKFLOW_ENGINE == "prefect":
from prefect import flow as prefect_flow
return _decorate_prefect_flow_subflow(_func, kwargs, settings)

return prefect_flow(_func, validate_parameters=False, **kwargs)
elif settings.WORKFLOW_ENGINE == "redun":
from redun import task

Expand All @@ -589,6 +587,48 @@
return _func


def _decorate_prefect_job(
_func: Callable, kwargs: dict[str, Any], settings: QuaccSettings
) -> Task:
from prefect import task

if settings.PREFECT_AUTO_SUBMIT or settings.NESTED_RESULTS_DIR:

@wraps(_func)
def wrapper(*f_args, **f_kwargs):
if settings.NESTED_RESULTS_DIR:
decorated = task(nest_results_dir_wrap(_func), **kwargs)
else:
decorated = task(_func, **kwargs)

Check warning on line 602 in src/quacc/wflow_tools/decorators.py

View check run for this annotation

Codecov / codecov/patch

src/quacc/wflow_tools/decorators.py#L602

Added line #L602 was not covered by tests
if settings.PREFECT_AUTO_SUBMIT:
return decorated.submit(*f_args, **f_kwargs)
else:
return decorated(*f_args, **f_kwargs)

return wrapper
else:
return task(_func, **kwargs)

Check warning on line 610 in src/quacc/wflow_tools/decorators.py

View check run for this annotation

Codecov / codecov/patch

src/quacc/wflow_tools/decorators.py#L610

Added line #L610 was not covered by tests


def _decorate_prefect_flow_subflow(
_func: Callable, kwargs: dict[str, Any], settings: QuaccSettings
) -> PrefectFlow:
from prefect import flow as prefect_flow

if settings.NESTED_RESULTS_DIR:

@wraps(_func)
def wrapper(*f_args, **f_kwargs):
decorated = prefect_flow(
nest_results_dir_wrap(_func), validate_parameters=False, **kwargs
)
return decorated(*f_args, **f_kwargs)

return wrapper
else:
return prefect_flow(_func, validate_parameters=False, **kwargs)

Check warning on line 629 in src/quacc/wflow_tools/decorators.py

View check run for this annotation

Codecov / codecov/patch

src/quacc/wflow_tools/decorators.py#L629

Added line #L629 was not covered by tests


def _get_parsl_wrapped_func(
func: Callable, decorator_kwargs: dict[str, Any]
) -> Callable:
Expand Down
33 changes: 33 additions & 0 deletions tests/prefect/test_customizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,36 @@ def my_flow():
my_flow().result()
assert not Path(tmp_dir1 / "job.txt").exists()
assert Path(tmp_dir2 / "job.txt").exists()


def test_nested_output_directory(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)

@job
def write_file_job(name="job.txt"):
results_file_path = Path(get_settings().RESULTS_DIR, name)
with open(results_file_path, "w") as f:
f.write("test job file")

return results_file_path

@flow
def write_file_flow(name="flow.txt"):
job_results_file_path = write_file_job()

flow_results_file_path = Path(get_settings().RESULTS_DIR, name)
with open(flow_results_file_path, "w") as f:
f.write("test flow file")

return job_results_file_path, flow_results_file_path

# Test with redecorating a job in a flow
job_results_file_path, flow_results_file_path = write_file_flow()
job_results_file_path = job_results_file_path.result()
assert Path(job_results_file_path).exists()
assert Path(flow_results_file_path).exists()

# Check the job output is in a subfolder of the flow folder
assert (
Path(job_results_file_path).parent.parent == Path(flow_results_file_path).parent
)
Loading