Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: NONE to NO_CACHE #16650

Merged
merged 8 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/v3/develop/task-caching.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ to compute the task's cache key.
- `TASK_SOURCE`: this cache policy uses _only_ the task's code definition to compute the cache key.
- `FLOW_PARAMETERS`: this cache policy uses _only_ the parameter values provided to the parent flow run
to compute the cache key.
- `NONE`: this cache policy always returns `None` and therefore avoids caching and result persistence altogether.
- `NO_CACHE`: this cache policy always returns `None` and therefore avoids caching and result persistence altogether.

These policies can be set using the `cache_policy` keyword on the [task decorator](https://prefect-python-sdk-docs.netlify.app/prefect/tasks/#prefect.tasks.task):

Expand Down Expand Up @@ -124,7 +124,7 @@ This expiration is then applied to _all_ other tasks that may share this cache k
### Cache policies

Cache policies can be composed and altered using basic Python syntax to form more complex policies.
For example, all task policies except for `NONE` can be _added_ together to form new policies that combine
For example, all task policies except for `NO_CACHE` can be _added_ together to form new policies that combine
the individual policies' logic into a larger cache key computation.
Combining policies in this way results in caches that are _easier_ to invalidate.

Expand Down
2 changes: 1 addition & 1 deletion docs/v3/develop/write-tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ Tasks allow for customization through optional arguments that can be provided to
| `tags` | An optional set of tags associated with runs of this task. These tags are combined with any tags defined by a `prefect.tags` context at task runtime. |
| `timeout_seconds` | An optional number of seconds indicating a maximum runtime for the task. If the task exceeds this runtime, it will be marked as failed. |
| `cache_key_fn` | An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result is restored instead of running the task again. |
| `cache_policy` | An optional policy that determines what information is used to generate cache keys. Available policies include `INPUTS`, `TASK_SOURCE`, `RUN_ID`, `FLOW_PARAMETERS`, and `NONE`. Can be combined using the + operator. |
| `cache_policy` | An optional policy that determines what information is used to generate cache keys. Available policies include `INPUTS`, `TASK_SOURCE`, `RUN_ID`, `FLOW_PARAMETERS`, and `NO_CACHE`. Can be combined using the + operator. |
| `cache_expiration` | An optional amount of time indicating how long cached states for this task are restorable; if not provided, cached states will never expire. |
| `retries` | An optional number of times to retry on task run failure. |
| `retry_delay_seconds` | An optional number of seconds to wait before retrying the task after failure. This is only applicable if `retries` is nonzero. |
Expand Down
5 changes: 3 additions & 2 deletions src/prefect/cache_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def compute_key(
class _None(CachePolicy):
"""
Policy that always returns `None` for the computed cache key.
This policy prevents persistence.
This policy prevents persistence and avoids caching entirely.
"""

def compute_key(
Expand Down Expand Up @@ -302,7 +302,7 @@ def compute_key(
"like locks, file handles, or other system resources.\n\n"
"To resolve this, you can:\n"
" 1. Exclude these arguments by defining a custom `cache_key_fn`\n"
" 2. Disable caching by passing `cache_policy=NONE`\n"
" 2. Disable caching by passing `cache_policy=NO_CACHE`\n"
)
raise ValueError(msg) from exc

Expand All @@ -314,6 +314,7 @@ def __sub__(self, other: str) -> "CachePolicy":

INPUTS = Inputs()
NONE = _None()
NO_CACHE = _None()
TASK_SOURCE = TaskSource()
FLOW_PARAMETERS = FlowParameters()
RUN_ID = RunId()
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from prefect import Task
from prefect._internal.concurrency.api import create_call, from_sync
from prefect.cache_policies import DEFAULT, NONE
from prefect.cache_policies import DEFAULT, NO_CACHE
from prefect.client.orchestration import get_client
from prefect.client.schemas.objects import TaskRun
from prefect.client.subscriptions import Subscription
Expand Down Expand Up @@ -93,7 +93,7 @@ def __init__(
if not isinstance(t, Task):
continue

if t.cache_policy in [None, NONE, NotSet]:
if t.cache_policy in [None, NO_CACHE, NotSet]:
self.tasks.append(
t.with_options(persist_result=True, cache_policy=DEFAULT)
)
Expand Down
10 changes: 6 additions & 4 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from typing_extensions import Literal, ParamSpec, Self, TypeAlias, TypeIs

import prefect.states
from prefect.cache_policies import DEFAULT, NONE, CachePolicy
from prefect.cache_policies import DEFAULT, NO_CACHE, CachePolicy
from prefect.client.orchestration import get_client
from prefect.client.schemas import TaskRun
from prefect.client.schemas.objects import (
Expand Down Expand Up @@ -441,7 +441,9 @@ def __init__(
if persist_result is None:
if any(
[
cache_policy and cache_policy != NONE and cache_policy != NotSet,
cache_policy
and cache_policy != NO_CACHE
and cache_policy != NotSet,
cache_key_fn is not None,
result_storage_key is not None,
result_storage is not None,
Expand All @@ -451,8 +453,8 @@ def __init__(
persist_result = True

if persist_result is False:
self.cache_policy = None if cache_policy is None else NONE
if cache_policy and cache_policy is not NotSet and cache_policy != NONE:
self.cache_policy = None if cache_policy is None else NO_CACHE
if cache_policy and cache_policy is not NotSet and cache_policy != NO_CACHE:
logger.warning(
"Ignoring `cache_policy` because `persist_result` is False"
)
Expand Down
18 changes: 9 additions & 9 deletions tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from prefect.cache_policies import (
DEFAULT,
INPUTS,
NONE,
NO_CACHE,
TASK_SOURCE,
CachePolicy,
Inputs,
Expand Down Expand Up @@ -1328,7 +1328,7 @@ def base():

@pytest.mark.parametrize(
"cache_policy",
[policy for policy in CachePolicy.__subclasses__() if policy != NONE],
[policy for policy in CachePolicy.__subclasses__() if policy != NO_CACHE],
)
def test_setting_cache_policy_sets_persist_result_to_true(self, cache_policy):
@task(cache_policy=cache_policy)
Expand Down Expand Up @@ -1861,7 +1861,7 @@ async def test_false_persist_results_sets_cache_policy_to_none(self, caplog):
def foo(x):
return x

assert foo.cache_policy == NONE
assert foo.cache_policy == NO_CACHE
assert (
"Ignoring `cache_policy` because `persist_result` is False"
not in caplog.text
Expand All @@ -1872,13 +1872,13 @@ async def test_warns_went_false_persist_result_and_cache_policy(self, caplog):
def foo(x):
return x

assert foo.cache_policy == NONE
assert foo.cache_policy == NO_CACHE

assert (
"Ignoring `cache_policy` because `persist_result` is False" in caplog.text
)

@pytest.mark.parametrize("cache_policy", [NONE, None])
@pytest.mark.parametrize("cache_policy", [NO_CACHE, None])
async def test_does_not_warn_went_false_persist_result_and_none_cache_policy(
self, caplog, cache_policy
):
Expand Down Expand Up @@ -1993,7 +1993,7 @@ def foo(x, lock_obj):
"1. Exclude these arguments by defining a custom `cache_key_fn`"
in error_msg
)
assert "2. Disable caching by passing `cache_policy=NONE`" in error_msg
assert "2. Disable caching by passing `cache_policy=NO_CACHE`" in error_msg

# Then we see the original HashError details
assert "Unable to create hash - objects could not be serialized." in error_msg
Expand All @@ -2016,7 +2016,7 @@ def foo_with_key_fn(x, lock_obj):
return x

# Solution 2: Disable caching entirely
@task(cache_policy=NONE, persist_result=True)
@task(cache_policy=NO_CACHE, persist_result=True)
def foo_with_none_policy(x, lock_obj):
return x

Expand All @@ -2038,7 +2038,7 @@ def test_flow():
assert await s1.result() == 42
assert await s2.result() == 42

# NONE policy approach should never cache
# NO_CACHE policy approach should never cache
assert s3.name == "Completed"
assert s4.name == "Completed"
assert await s3.result() == 42
Expand Down Expand Up @@ -5245,7 +5245,7 @@ def test_cache_policy_init_to_none_when_not_persisting_results(self):
def my_task():
pass

assert my_task.cache_policy is NONE
assert my_task.cache_policy is NO_CACHE

def test_cache_policy_init_to_default_when_persisting_results(self):
@task(persist_result=True)
Expand Down
Loading