Skip to content

Commit

Permalink
update prefect agent
Browse files Browse the repository at this point in the history
  • Loading branch information
goFrendiAsgard committed Dec 18, 2023
1 parent 82deb1d commit dbfe6aa
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
16 changes: 9 additions & 7 deletions src/zrb/task/base_task/component/common_task_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
self._retry = retry
self._retry_interval = retry_interval
self._upstreams = upstreams
self._checkers = checkers
self._checkers = [checker.copy() for checker in checkers]
self._checking_interval = checking_interval
self._run_function: Optional[Callable[..., Any]] = run
self._on_triggered = on_triggered
Expand All @@ -77,10 +77,10 @@ def __init__(
self.__allow_add_env_files = True
self.__allow_add_inputs = True
self.__allow_add_upstreams: bool = True
self.__allow_add_checkers: bool = True
self.__has_already_inject_env_files: bool = False
self.__has_already_inject_envs: bool = False
self.__has_already_inject_inputs: bool = False
self.__has_already_inject_checkers: bool = False
self.__has_already_inject_upstreams: bool = False
self.__all_inputs: Optional[List[AnyInput]] = None

Expand Down Expand Up @@ -275,18 +275,20 @@ def _get_env_files(self) -> List[EnvFile]:
def insert_checker(self, *checkers: AnyTask):
if not self.__allow_add_checkers:
raise Exception(f'Cannot insert checkers to `{self._name}`')
self._checkers = list(checkers) + list(self._checkers)
additional_checkers = [checker.copy() for checker in checkers]
self._checkers = additional_checkers + self._checkers

def add_checker(self, *checkers: AnyTask):
if not self.__allow_add_checkers:
raise Exception(f'Cannot add checkers to `{self._name}`')
self._checkers = list(self._checkers) + list(checkers)
additional_checkers = [checker.copy() for checker in checkers]
self._checkers = self._checkers + additional_checkers

def inject_checkers(self):
pass

def _get_checkers(self) -> List[AnyTask]:
if not self.__has_already_inject_checkers:
if not self.__allow_add_checkers:
self.inject_checkers()
self.__has_already_inject_checkers = True
return [checker.copy() for checker in list(self._checkers)]
self.__allow_add_checkers = True
return self._checkers
36 changes: 36 additions & 0 deletions test/task/test_task_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from zrb.task.checker import Checker
from zrb.task.task import Task
from zrb.task_input.str_input import StrInput
from zrb.task_env.env import Env
import asyncio


class DelayedChecker(Checker):
async def inspect(self, *args, **kwargs) -> bool:
asyncio.sleep(1)
return True


def test_use_the_same_task_for_checker_and_upstream():
delayed_checker = DelayedChecker()
# use delayed_chacker as task_1 checker
task_1 = Task(
name='task-1',
inputs=[StrInput(name='input-1')],
envs=[Env(name='ENV1')],
checkers=[delayed_checker]
)
# use delayed_chacker as task_2 checker
task_2 = Task(
name='task-2',
inputs=[StrInput(name='input-2')],
envs=[Env(name='ENV2')],
)
task_2.add_checker(delayed_checker)
# use delayed_chacker as task_3 upstream
task = Task(
name='task',
upstreams=[task_1, task_2, delayed_checker]
)
fn = task.to_function()
fn()

0 comments on commit dbfe6aa

Please sign in to comment.