diff --git a/.github/workflows/testing-workflow.yml b/.github/workflows/testing-workflow.yml index 5e3ba9d4..57974fbd 100644 --- a/.github/workflows/testing-workflow.yml +++ b/.github/workflows/testing-workflow.yml @@ -43,7 +43,7 @@ jobs: with: python-version: ${{ matrix.python_version }} cache: 'poetry' - # use the whole repo's hash (i.e. recalculate the deps everytime) + # use the whole repo's hash (i.e. recalculate the deps every time) cache-dependency-path: pyproject.toml - name: Install dependencies diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d1b86f54..e0cf484b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,7 +2,10 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.4.0 hooks: + - id: check-added-large-files - id: check-ast + - id: check-docstring-first + - id: check-symlinks - id: check-builtin-literals exclude: tests - id: check-merge-conflict @@ -10,30 +13,25 @@ repos: - id: check-toml - id: debug-statements - id: end-of-file-fixer + - id: mixed-line-ending - id: trailing-whitespace - - repo: https://github.com/pycqa/isort - rev: 5.12.0 - hooks: - - id: isort - # only useful for pre-commit hooked to git - args: ["--profile", "black"] - repo: https://github.com/psf/black rev: 23.1.0 hooks: - id: black - - repo: https://github.com/pycqa/flake8 - rev: "6.0.0" + - repo: https://github.com/charliermarsh/ruff-pre-commit + rev: "v0.0.253" hooks: - - id: flake8 - exclude: tests + - id: ruff + args: ["--fix"] - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.0.1 hooks: - - id: mypy - # files: tawazi + - id: mypy + files: tawazi args: [] additional_dependencies: - pydantic @@ -44,11 +42,5 @@ repos: - repo: https://github.com/PyCQA/bandit rev: 1.7.4 hooks: - - id: bandit + - id: bandit args: ["--ini", ".bandit", "--recursive"] - - - repo: https://github.com/terrencepreilly/darglint - rev: v1.8.1 - hooks: - - id: darglint - exclude: ^(tests|scripts)/ diff --git a/README.md b/README.md index 34e0564b..9ff39801 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ assert pipeline() == (2, 3) Currently you can only return a single value from an `ExecNode`, in the future multiple return values will be allowed. -* You can have setup `ExecNode`s; Theses are `ExecNode`s that will run once per DAG instance +* You can have setup `ExecNode`s; These are `ExecNode`s that will run once per DAG instance ```Python from copy import deepcopy @@ -291,7 +291,7 @@ assert res_c == "A + B = C" +2. All code inside a dag descriptor function must be either an @op decorated functions calls and arguments passed arguments. Otherwise the behavior of the DAG might be unpredictable --> diff --git a/documentation/future_developments.md b/documentation/future_developments.md index d67b492b..205dfa80 100644 --- a/documentation/future_developments.md +++ b/documentation/future_developments.md @@ -4,7 +4,7 @@ A couple of features will be released soon: * handle problems when calling `ExecNodes` wrongly. * (for example when using *args as parameters but only **kwargs are provided). - * Calling `ExecNodes` must be similar to calling the original function (must imitate the same signature otherwise raise the correct exeception). + * Calling `ExecNodes` must be similar to calling the original function (must imitate the same signature otherwise raise the correct exception). * support mixing ExecNodes and non `ExecNodes` functions. * test the case where `ExecNodes` are stored in a list and then passed via * operator. * add feature to turn off a set of nodes in the graph. diff --git a/pyproject.toml b/pyproject.toml index eaa4d55d..6d778029 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,10 +55,39 @@ strict_equality = true module = "matplotlib.*,networkx.*,yaml.*" ignore_missing_imports = true -[tool.isort] -profile = "black" -line_length = 100 - [tool.black] skip-magic-trailing-comma = true line-length = 100 + + +[tool.ruff] +# Use of assert detected +ignore = ["E501", "S101"] +src = ["src"] +line-length = 100 +target-version = "py38" +select = [ + "B", + "D", + "E", + "F", + "I", + "N", + "PGH", + "UP", + "S", + "T20", + "TID", + "W", + "RET" +] + +[tool.ruff.per-file-ignores] +"tests/*.py" = ["D"] +"scripts/*.py" = ["D", "PGH"] + +[tool.ruff.flake8-tidy-imports] +ban-relative-imports = "parents" + +[tool.ruff.pydocstyle] +convention = "google" diff --git a/tawazi/__init__.py b/tawazi/__init__.py index cb4d6891..9b3cc895 100644 --- a/tawazi/__init__.py +++ b/tawazi/__init__.py @@ -1,13 +1,11 @@ +"""tawazi is a package that allows parallel execution of a set of functions written in Python.""" + # exposing useful objects / Classes +from .config import Cfg from .dag import DAG, DAGExecution -from .decorators import xn, dag +from .decorators import dag, xn from .errors import ErrorStrategy -from .config import Cfg -""" -tawazi is a package that allows parallel execution of a set of functions written in Python -isort:skip_file -""" __version__ = "0.2.0" __all__ = ["DAG", "DAGExecution", "xn", "dag", "ErrorStrategy", "Cfg"] diff --git a/tawazi/config.py b/tawazi/config.py index 9fa2e00e..37476faa 100644 --- a/tawazi/config.py +++ b/tawazi/config.py @@ -1,7 +1,11 @@ +"""configuration parameters for Tawazi.""" + from pydantic import BaseSettings, Field class Config(BaseSettings): + """Class to set configuration parameters for Tawazi.""" + # whether the default in tawazi is sequentiality or not. # This is helpful to reduce your code size and avoid repeating @xn(is_sequentiality=True/False) TAWAZI_IS_SEQUENTIAL: bool = False diff --git a/tawazi/consts.py b/tawazi/consts.py index fe7cacbf..c3d79c4f 100644 --- a/tawazi/consts.py +++ b/tawazi/consts.py @@ -1,3 +1,4 @@ +"""Module containing constants used by Tawazi.""" from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union from typing_extensions import ParamSpec @@ -11,8 +12,8 @@ class NoValType: - """ - Tawazi's special None. + """Tawazi's special None. + This class is a singleton similar to None to determine that no value is assigned >>> NoVal1 = NoValType() >>> NoVal2 = NoValType() @@ -26,23 +27,59 @@ class NoValType: _instance = None def __new__(cls: Type["NoValType"]) -> "NoValType": + """Constructor for NoValType. + + Returns: + NoValType: new instance of NoValType. + """ if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __bool__(self) -> bool: + """Whether NoVal is Truthy or Falsy. + + Returns: + bool: always False + """ return False def __repr__(self) -> str: + """Representation of NoValType. + + Returns: + str: "NoVal" + """ return "NoVal" def __eq__(self, __o: object) -> bool: + """Check for equality. + + Args: + __o (object): the other object + + Returns: + bool: always returns False + """ return False def __copy__(self) -> "NoValType": + """Copy of NoVal. + + Returns: + NoValType: Returns the original because NoVal is a singleton. + """ return self def __deepcopy__(self, _prev: Dict[Any, Any]) -> "NoValType": + """Deep copy NoVal. + + Args: + _prev (Dict[Any, Any]): the previous state of the object + + Returns: + NoValType: the original NoVal because NoVal is a singleton. + """ return self @@ -55,7 +92,7 @@ def __deepcopy__(self, _prev: Dict[Any, Any]) -> "NoValType": # NOTE: maybe support other key types? for example int... or even tuple... ReturnIDsType = Optional[ - Union[Dict[str, Identifier], List[Identifier], Tuple[Identifier], Identifier] + Union[Dict[str, Identifier], List[Identifier], Tuple[Identifier, ...], Identifier] ] RVTypes = Union[Any, Tuple[Any], List[Any], Dict[str, Any]] P = ParamSpec("P") diff --git a/tawazi/dag.py b/tawazi/dag.py index 87f4bc31..235a5ebd 100644 --- a/tawazi/dag.py +++ b/tawazi/dag.py @@ -1,3 +1,4 @@ +"""module containing DAG and DAGExecution which are the containers that run ExecNodes in Tawazi.""" import json import pickle import time @@ -16,7 +17,7 @@ from tawazi.config import Cfg from tawazi.consts import ReturnIDsType -from tawazi.helpers import UniqueKeyLoader, filter_NoVal +from tawazi.helpers import _filter_noval, _UniqueKeyLoader from .consts import RVDAG, Identifier, P, RVTypes from .digraph import DiGraphEx @@ -25,8 +26,8 @@ class DAG(Generic[P, RVDAG]): - """ - Data Structure containing ExecNodes with interdependencies. + """Data Structure containing ExecNodes with interdependencies. + The ExecNodes can be executed in parallel with the following restrictions: * Limited number of threads. * Parallelization constraint of each ExecNode (is_sequential attribute) @@ -39,7 +40,8 @@ def __init__( max_concurrency: int = 1, behavior: ErrorStrategy = ErrorStrategy.strict, ): - """ + """Constructor of the DAG. Should not be called directly. Instead use the `dag` decorator. + Args: exec_nodes: all the ExecNodes max_concurrency: the maximal number of threads running in parallel @@ -94,10 +96,23 @@ def __init__( @property def max_concurrency(self) -> int: + """Maximal number of threads running in parallel. + + Returns: + int: maximum number of threads running in parallel + """ return self._max_concurrency @max_concurrency.setter def max_concurrency(self, value: int) -> None: + """Set the maximal number of threads running in parallel. + + Args: + value (int): maximum number of threads running in parallel + + Raises: + ValueError: if value is not a positive integer + """ if not isinstance(value, int): raise ValueError("max_concurrency must be an int") if value < 1: @@ -106,10 +121,25 @@ def max_concurrency(self, value: int) -> None: # getters def get_nodes_by_tag(self, tag: Any) -> List[ExecNode]: - nodes = [ex_n for ex_n in self.exec_nodes if ex_n.tag == tag] - return nodes + """Get the ExecNodes with the given tag. + + Args: + tag (Any): tag of the ExecNodes + + Returns: + List[ExecNode]: corresponding ExecNodes + """ + return [ex_n for ex_n in self.exec_nodes if ex_n.tag == tag] def get_node_by_id(self, id_: Identifier) -> ExecNode: + """Get the ExecNode with the given id. + + Args: + id_ (Identifier): id of the ExecNode + + Returns: + ExecNode: corresponding ExecNode + """ # TODO: ? catch the keyError and # help the user know the id of the ExecNode by pointing to documentation!? return self.node_dict[id_] @@ -118,11 +148,9 @@ def get_node_by_id(self, id_: Identifier) -> ExecNode: # TODO: validate using Pydantic def _find_cycle(self) -> Optional[List[Tuple[str, str]]]: - """ - A DAG doesn't have any dependency cycle. - This method returns the cycles if found. + """Finds the cycles in the DAG. A DAG shouldn't have any dependency cycle. - returns: + Returns: A list of the edges responsible for the cycles in case there are some (in forward and backward), otherwise nothing. (e.g. [('taxes', 'amount_reconciliation'),('amount_reconciliation', 'taxes')]) """ @@ -133,8 +161,7 @@ def _find_cycle(self) -> Optional[List[Tuple[str, str]]]: return None def _build(self) -> None: - """ - Builds the graph and the sequence order for the computation. + """Builds the graph and the sequence order for the computation. Raises: NetworkXUnfeasible: if the graph has cycles @@ -166,8 +193,8 @@ def _validate(self) -> None: # future validations... def _assign_compound_priority(self) -> None: - """ - Assigns a compound priority to all nodes in the graph. + """Assigns a compound priority to all nodes in the graph. + The compound priority is the sum of the priorities of all children recursively. """ # 1. deepcopy graph_ids because it will be modified (pruned) @@ -217,8 +244,7 @@ def draw(self, k: float = 0.8, display: bool = True, t: int = 3) -> None: @classmethod def _copy_non_setup_xns(cls, x_nodes: Dict[str, ExecNode]) -> Dict[str, ExecNode]: - """ - Deep copy all ExecNodes except setup ExecNodes because they are shared throughout the DAG instance + """Deep copy all ExecNodes except setup ExecNodes because they are shared throughout the DAG instance. Args: x_nodes: Dict[str, ExecNode] x_nodes to be deep copied @@ -244,9 +270,9 @@ def _execute( modified_node_dict: Optional[Dict[str, ExecNode]] = None, call_id: str = "", ) -> Dict[Identifier, Any]: - """ - Thread safe execution of the DAG... - (Except for the setup nodes! Please run DAG.setup() in a single thread because its results will be cached). + """Thread safe execution of the DAG. + + (Except for the setup nodes! Please run DAG.setup() in a single thread because its results will be cached). Args: graph: the graph ids to be executed @@ -375,7 +401,7 @@ def get_highest_priority_nodes(nodes: List[ExecNode]) -> List[ExecNode]: return xns_dict def _alias_to_ids(self, alias: Alias) -> List[Identifier]: - """Extract an ExecNode ID from an Alias (Tag, ExecNode ID or ExecNode) + """Extract an ExecNode ID from an Alias (Tag, ExecNode ID or ExecNode). Args: alias (Alias): an Alias (Tag, ExecNode ID or ExecNode) @@ -390,36 +416,28 @@ def _alias_to_ids(self, alias: Alias) -> List[Identifier]: if isinstance(alias, ExecNode): return [alias.id] # todo: do further validation for the case of the tag!! - elif isinstance(alias, (Identifier, tuple)): + if isinstance(alias, (Identifier, tuple)): # if leaves_identification is not ExecNode, it can be either # 1. a Tag (Highest priority in case an id with the same value exists) if nodes := self.tagged_nodes.get(alias): return [node.id for node in nodes] # 2. or a node id! - elif isinstance(alias, Identifier) and alias in self.node_dict: + if isinstance(alias, Identifier) and alias in self.node_dict: node = self.get_node_by_id(alias) return [node.id] - else: - raise ValueError( - f"node or tag {alias} not found in DAG.\n" - f" Available nodes are {self.node_dict}.\n" - f" Available tags are {list(self.tagged_nodes.keys())}" - ) - else: - raise TawaziTypeError( - "target_nodes must be of type ExecNode, " - f"str or tuple identifying the node but provided {alias}" + raise ValueError( + f"node or tag {alias} not found in DAG.\n" + f" Available nodes are {self.node_dict}.\n" + f" Available tags are {list(self.tagged_nodes.keys())}" ) + raise TawaziTypeError( + "target_nodes must be of type ExecNode, " + f"str or tuple identifying the node but provided {alias}" + ) # NOTE: this function is named wrongly! def _get_target_ids(self, target_nodes: List[Alias]) -> List[Identifier]: - """ - get the ids of ExecNodes corresponding to target_nodes. - The identification can be carried out using the tag, the Id, or the ExecNode itself. - Keep in Mind that depending on the way ExecNode is provided inside target_nodes, - the returned id - if nothing is provided it will return all leaves_ids - Handles the debug nodes + """Get the ids of ExecNodes corresponding to target_nodes. Args: target_nodes (Optional[List[Alias]]): list of a ExecNode Aliases that the user might provide to run a subgraph @@ -427,12 +445,7 @@ def _get_target_ids(self, target_nodes: List[Alias]) -> List[Identifier]: Returns: List[Identifier]: Leaf ExecNodes' Identities """ - # Raises: - # TawaziBaseException: if the returned List[Identifier] has the wrong length, this indicates a bug in the code - - target_ids = list(chain(*(self._alias_to_ids(alias) for alias in target_nodes))) - - return target_ids + return list(chain(*(self._alias_to_ids(alias) for alias in target_nodes))) def _extend_leaves_ids_debug_xns(self, leaves_ids: List[Identifier]) -> List[Identifier]: new_debug_xn_discovered = True @@ -468,7 +481,6 @@ def setup( exclude_nodes (Optional[List[XNId]], optional): The ExecNodes that the user aims to exclude from the DAG. The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical. """ - # 1. select all setup ExecNodes # do not copy the setup nodes because we want them to be modified per DAG instance! all_setup_nodes = { @@ -480,7 +492,7 @@ def setup( # 2. if target_nodes is not provided run all setup ExecNodes if target_nodes is None: target_ids = list(all_setup_nodes.keys()) - graph = self._make_subgraph(target_ids, exclude_nodes) # type: ignore + graph = self._make_subgraph(target_ids, exclude_nodes) # type: ignore[arg-type] else: # 2.1 the leaves_ids that the user wants to execute @@ -490,7 +502,7 @@ def setup( target_ids = self._get_target_ids(target_nodes) # 2.2 filter non setup ExecNodes - graph = self._make_subgraph(target_ids, exclude_nodes) # type: ignore + graph = self._make_subgraph(target_ids, exclude_nodes) # type: ignore[arg-type] ids_to_remove = [id_ for id_ in graph if id_ not in all_setup_nodes] for id_ in ids_to_remove: @@ -560,8 +572,7 @@ def _make_subgraph( return graph def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG: - """ - Execute the DAG scheduler via a similar interface to the function that describes the dependencies. + """Execute the DAG scheduler via a similar interface to the function that describes the dependencies. Args: *args (Any): arguments to be passed to the call of the DAG @@ -577,8 +588,8 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG: # Raises: # TawaziTypeError: if target_nodes contains a wrong typed identifier or if the return value contain a non LazyExecNode - target_nodes: Optional[List[Alias]] = kwargs.get("target_nodes") # type: ignore - exclude_nodes: Optional[List[Alias]] = kwargs.get("exclude_nodes") # type: ignore + target_nodes: Optional[List[Alias]] = kwargs.get("target_nodes") # type: ignore[assignment] + exclude_nodes: Optional[List[Alias]] = kwargs.get("exclude_nodes") # type: ignore[assignment] # 1. generate the subgraph to be executed graph = self._make_subgraph(target_nodes=target_nodes, exclude_nodes=exclude_nodes) @@ -590,13 +601,11 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG: all_nodes_dict = self._execute(graph, call_xn_dict) # 4. extract the returned value/values - returned_values = self._get_return_values(all_nodes_dict) - - return returned_values # type: ignore + return self._get_return_values(all_nodes_dict) # type: ignore[return-value] def _make_call_xn_dict(self, *args: Any) -> Dict[Identifier, ExecNode]: - """ - Generate the calling ExecNode dict. + """Generate the calling ExecNode dict. + This is a dict containing ExecNodes that will be executed (hence modified) by the DAG scheduler. This takes into consideration: 1. deep copying the ExecNodes @@ -635,8 +644,7 @@ def _make_call_xn_dict(self, *args: Any) -> Dict[Identifier, ExecNode]: return call_xn_dict def _get_return_values(self, xn_dict: Dict[Identifier, ExecNode]) -> RVTypes: - """ - Extract the return value/values from the output of the DAG's scheduler! + """Extract the return value/values from the output of the DAG's scheduler! Args: xn_dict (Dict[Identifier, ExecNode]): Modified ExecNodes returned by the DAG's scheduler @@ -650,16 +658,17 @@ def _get_return_values(self, xn_dict: Dict[Identifier, ExecNode]) -> RVTypes: if self.return_ids is None: return None if isinstance(self.return_ids, Identifier): - return filter_NoVal(xn_dict[self.return_ids].result) + return _filter_noval(xn_dict[self.return_ids].result) if isinstance(self.return_ids, (tuple, list)): - gen = (filter_NoVal(xn_dict[ren_id].result) for ren_id in self.return_ids) + gen = (_filter_noval(xn_dict[ren_id].result) for ren_id in self.return_ids) if isinstance(self.return_ids, tuple): return tuple(gen) if isinstance(self.return_ids, list): return list(gen) if isinstance(self.return_ids, dict): return { - key: filter_NoVal(xn_dict[ren_id].result) for key, ren_id in self.return_ids.items() + key: _filter_noval(xn_dict[ren_id].result) + for key, ren_id in self.return_ids.items() } raise TawaziTypeError("Return type for the DAG can only be a single value, Tuple or List") @@ -672,8 +681,7 @@ def _safe_execute( target_nodes: Optional[List[Alias]] = None, exclude_nodes: Optional[List[Alias]] = None, ) -> Any: - """ - Execute the ExecNodes in topological order without priority in for loop manner for debugging purposes + """Execute the ExecNodes in topological order without priority in for loop manner for debugging purposes. Args: *args (Any): Positional arguments passed to the DAG @@ -696,14 +704,10 @@ def _safe_execute( call_xn_dict[xn_id].execute(call_xn_dict) # 4. make returned values - return_values = self._get_return_values(call_xn_dict) - - return return_values + return self._get_return_values(call_xn_dict) def _handle_exception(self, graph: DiGraphEx, fut: "Future[Any]", id_: Identifier) -> None: - """ - checks if futures have produced exceptions, and handles them - according to the specified behavior + """Checks if futures have produced exceptions, and handles them according to the specified behavior. Args: graph: the graph @@ -713,7 +717,6 @@ def _handle_exception(self, graph: DiGraphEx, fut: "Future[Any]", id_: Identifie Raises: NotImplementedError: if self.behavior is not known """ - if self.behavior == ErrorStrategy.strict: # will raise the first encountered exception if there's one # no simpler way to check for exception, and not supported by flake8 @@ -723,7 +726,7 @@ def _handle_exception(self, graph: DiGraphEx, fut: "Future[Any]", id_: Identifie try: _res = fut.result() # noqa: F841 - except Exception: + except Exception as e: logger.exception(f"The feature {id_} encountered the following error:") if self.behavior == ErrorStrategy.permissive: @@ -738,30 +741,28 @@ def _handle_exception(self, graph: DiGraphEx, fut: "Future[Any]", id_: Identifie graph.remove_recursively(children_ids) else: - raise NotImplementedError(f"Unknown behavior name: {self.behavior}") + raise NotImplementedError(f"Unknown behavior name: {self.behavior}") from e def config_from_dict(self, config: Dict[str, Any]) -> None: - """ - Allows reconfiguring the parameters of the nodes from a dictionary + """Allows reconfiguring the parameters of the nodes from a dictionary. Args: config (Dict[str, Any]): the dictionary containing the config - example: {"nodes": {"a": {"priority": 3, "is_sequential": True}}, "max_concurrency": 3} + example: {"nodes": {"a": {"priority": 3, "is_sequential": True}}, "max_concurrency": 3} Raises: ValueError: if two nodes are configured by the provided config (which is ambiguous) """ def _override_node_config(n: ExecNode, cfg: Dict[str, Any]) -> bool: - flag = False if "is_sequential" in cfg: n.is_sequential = cfg["is_sequential"] if "priority" in cfg: n.priority = cfg["priority"] - flag = True + return True - return flag + return False prio_flag = False visited: Dict[str, Any] = {} @@ -789,25 +790,23 @@ def _override_node_config(n: ExecNode, cfg: Dict[str, Any]) -> bool: self._assign_compound_priority() def config_from_yaml(self, config_path: str) -> None: - """ - Allows reconfiguring the parameters of the nodes from a yaml file + """Allows reconfiguring the parameters of the nodes from a yaml file. Args: config_path: the path to the yaml file """ - with open(config_path, "r") as f: - yaml_config = yaml.load(f, Loader=UniqueKeyLoader) + with open(config_path) as f: + yaml_config = yaml.load(f, Loader=_UniqueKeyLoader) # noqa: S506 self.config_from_dict(yaml_config) def config_from_json(self, config_path: str) -> None: - """ - Allows reconfiguring the parameters of the nodes from a yaml file + """Allows reconfiguring the parameters of the nodes from a yaml file. Args: config_path: the path to the json file """ - with open(config_path, "r") as f: + with open(config_path) as f: json_config = json.load(f) self.config_from_dict(json_config) @@ -816,6 +815,12 @@ def config_from_json(self, config_path: str) -> None: # TODO: check if the arguments are the same, then run the DAG using the from_cache. # If the arguments are not the same, then rerun the DAG! class DAGExecution(Generic[P, RVDAG]): + """A disposable callable instance of a DAG. + + It holds information about the last execution. Hence it is not threadsafe. + It might be reusable, however it is not recommended to reuse an instance of DAGExecutor!. + """ + def __init__( self, dag: DAG[P, RVDAG], @@ -827,10 +832,7 @@ def __init__( from_cache: str = "", call_id: Optional[str] = None, ): - """ - This is an instance of DAGExecution which is a disposable callable instance of a DAG. - It holds information about the DAG's last execution. Hence it is not threadsafe. - It is reusable, however it is not recommended to reuse an instance of DAGExecutor!. + """Constructor. Args: dag (DAG): The attached DAG. @@ -886,6 +888,11 @@ def __init__( @property def cache_in(self) -> str: + """The path to the file where the execution should be cached. + + Returns: + str: The path to the file where the execution should be cached. + """ return self._cache_in @cache_in.setter @@ -896,6 +903,11 @@ def cache_in(self, cache_in: str) -> None: @property def from_cache(self) -> str: + """Get the file path from which the cached execution should be loaded. + + Returns: + str: the file path of the cached execution + """ return self._from_cache @from_cache.setter @@ -906,6 +918,11 @@ def from_cache(self, from_cache: str) -> None: @property def cache_deps_of(self) -> Optional[List[Alias]]: + """Cache all the dependencies of these nodes. + + Returns: + Optional[List[Alias]]: List of Aliases passed to cache_deps_of while instantiating DAGExecution + """ return self._cache_deps_of @cache_deps_of.setter @@ -920,21 +937,41 @@ def cache_deps_of(self, cache_deps_of: Optional[List[Alias]]) -> None: @property def executed(self) -> bool: + """Whether DAGExecution has been executed. + + Returns: + bool: whether DAGExecution has been executed or not + """ return len(self.graph) == 0 # we need to reimplement the public methods of DAG here in order to have a constant public interface # getters def get_nodes_by_tag(self, tag: Any) -> List[ExecNode]: - nodes = [ex_n for ex_n in self.xn_dict.values() if ex_n.tag == tag] - return nodes + """Get all the nodes with the given tag. + + Args: + tag (Any): tag of ExecNodes in question + + Returns: + List[ExecNode]: corresponding ExecNodes + """ + return [ex_n for ex_n in self.xn_dict.values() if ex_n.tag == tag] def get_node_by_id(self, id_: Identifier) -> ExecNode: + """Get node with the given id. + + Args: + id_ (Identifier): id of the ExecNode + + Returns: + ExecNode: Corresponding ExecNode + """ # TODO: ? catch the keyError and # help the user know the id of the ExecNode by pointing to documentation!? return self.xn_dict[id_] def setup(self, twz_nodes: Optional[List[Alias]] = None) -> None: - """Does the same thing as DAG.setup + """Does the same thing as DAG.setup. Args: twz_nodes (Optional[List[Alias]], optional): c.f. `DAG.setup`. @@ -942,6 +979,18 @@ def setup(self, twz_nodes: Optional[List[Alias]] = None) -> None: self.dag.setup(twz_nodes) def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG: + """Call the DAG. + + Args: + *args: positional arguments to pass in to the DAG + **kwargs: keyword arguments to pass in to the DAG + + Raises: + TawaziUsageError: if the DAGExecution has already been executed. + + Returns: + RVDAG: the return value of the DAG's Execution + """ if self.executed: raise TawaziUsageError( "DAGExecution object should not be reused. Instantiate a new one" @@ -995,8 +1044,6 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG: ) # 3. extract the returned value/values - returned_values = dag._get_return_values(self.xn_dict) - - return returned_values # type: ignore + return dag._get_return_values(self.xn_dict) # type: ignore[return-value] # TODO: add execution order (the order in which the nodes were executed) diff --git a/tawazi/decorators.py b/tawazi/decorators.py index 8fe94cde..b973c2d6 100644 --- a/tawazi/decorators.py +++ b/tawazi/decorators.py @@ -1,3 +1,4 @@ +"""Module for decorators used in Tawazi.""" import functools from typing import Any, Callable, List, Optional, Union, overload @@ -37,8 +38,8 @@ def xn( tag: Optional[Any] = None, setup: bool = False, ) -> Union[Callable[[Callable[P, RVXN]], LazyExecNode[P, RVXN]], LazyExecNode[P, RVXN]]: - """ - Decorate a function to make it an ExecNode. + """Decorate a function to make it an ExecNode. + When the decorated function is called, you are actually calling an ExecNode. This way we can record the dependencies in order to build the actual DAG. Please check the example in the README for a guide to the usage. @@ -78,10 +79,9 @@ def intermediate_wrapper(_func: Callable[P, RVXN]) -> LazyExecNode[P, RVXN]: if func is None: return intermediate_wrapper # case #2: no argument is provided to the decorator - else: - if not callable(func): - raise TypeError(f"{func} is not a callable. Did you use a non-keyword argument?") - return intermediate_wrapper(func) + if not callable(func): + raise TypeError(f"{func} is not a callable. Did you use a non-keyword argument?") + return intermediate_wrapper(func) @overload @@ -102,8 +102,8 @@ def dag( max_concurrency: int = 1, behavior: ErrorStrategy = ErrorStrategy.strict, ) -> Union[Callable[[Callable[P, RVDAG]], DAG[P, RVDAG]], DAG[P, RVDAG]]: - """ - Transform the declared ops into a DAG that can be executed by tawazi's scheduler. + """Transform the declared ops into a DAG that can be executed by tawazi's scheduler. + The same DAG can be executed multiple times. Note: dag is thread safe because it uses an internal lock. If you need to construct lots of DAGs in multiple threads, @@ -153,7 +153,7 @@ def intermediate_wrapper(_func: Callable[P, RVDAG]) -> DAG[P, RVDAG]: # 3. Execute the dependency describer function # NOTE: Only ordered parameters are supported at the moment! # No **kwargs!! Only positional Arguments - returned_exec_nodes = _func(*args) # type: ignore + returned_exec_nodes = _func(*args) # type: ignore[arg-type] # 4. Construct the DAG instance d: DAG[P, RVDAG] = DAG( @@ -181,9 +181,8 @@ def intermediate_wrapper(_func: Callable[P, RVDAG]) -> DAG[P, RVDAG]: # return a decorator return intermediate_wrapper # case 2: arguments aren't provided to the decorator - else: - if not callable(declare_dag_function): - raise TypeError( - f"{declare_dag_function} is not a callable. Did you use a non-keyword argument?" - ) - return intermediate_wrapper(declare_dag_function) + if not callable(declare_dag_function): + raise TypeError( + f"{declare_dag_function} is not a callable. Did you use a non-keyword argument?" + ) + return intermediate_wrapper(declare_dag_function) diff --git a/tawazi/digraph.py b/tawazi/digraph.py index 9f5285f7..a0b940a2 100644 --- a/tawazi/digraph.py +++ b/tawazi/digraph.py @@ -1,3 +1,4 @@ +"""Module containing the definition of a Directed Graph Extension of networkx.DiGraph.""" from copy import deepcopy from typing import List, Optional, Set @@ -8,13 +9,10 @@ class DiGraphEx(nx.DiGraph): - """ - Extends the DiGraph with some methods - """ + """Extends the DiGraph with some methods.""" def root_nodes(self) -> List[Identifier]: - """ - Safely gets the root nodes + """Safely gets the root nodes. Returns: List of root nodes @@ -23,8 +21,7 @@ def root_nodes(self) -> List[Identifier]: @property def leaf_nodes(self) -> List[Identifier]: - """ - Safely gets the leaf nodes + """Safely get the leaf nodes. Returns: List of leaf nodes @@ -32,8 +29,7 @@ def leaf_nodes(self) -> List[Identifier]: return [node for node, degree in self.out_degree if degree == 0] def remove_recursively(self, root_node: Identifier) -> None: - """ - Recursively removes all the nodes that depend on the provided one including itself + """Recursively removes all the nodes that depend on the provided one including itself. Args: root_node (Identifier): the root node @@ -43,19 +39,19 @@ def remove_recursively(self, root_node: Identifier) -> None: def dfs(n: Identifier, graph: DiGraphEx, visited: Set[Identifier]) -> None: if n in visited: return - else: - visited.add(n) - for child in graph[n].keys(): - dfs(child, graph, visited) + + visited.add(n) + for child in graph[n].keys(): + dfs(child, graph, visited) dfs(root_node, self, nodes_to_remove) for node in nodes_to_remove: self.remove_node(node) def subgraph_leaves(self, nodes: List[Identifier]) -> Set[Identifier]: - """ - modifies the graph to become a subgraph - that contains the provided nodes as leaf nodes. + """Modifies the graph to become a subgraph. + + The generated subgraph contains the provided nodes as leaf nodes. For example: TODO: use the future print to test this function! graph = @@ -89,7 +85,6 @@ def subgraph_leaves(self, nodes: List[Identifier]) -> Set[Identifier]: Raises: ValueError: if the provided nodes are not in the graph """ - # works by pruning the graph until all leaf nodes # are contained inside the provided "nodes" # in the arguments of this method @@ -121,8 +116,7 @@ def subgraph_leaves(self, nodes: List[Identifier]) -> Set[Identifier]: @property def topologically_sorted(self) -> List[Identifier]: - """ - Makes the simple topological sort of the graph nodes + """Makes the simple topological sort of the graph nodes. Returns: List of nodes of the graph listed in topological order @@ -131,9 +125,7 @@ def topologically_sorted(self) -> List[Identifier]: def subgraph(graph: DiGraphEx, leaves_ids: Optional[List[Identifier]]) -> DiGraphEx: - """ - returns a deep copy of the same graph if leaves_ids is None, - otherwise returns a new graph by applying `graph.subgraph_leaves` + """Deep copy the same graph if leaves_ids is None, otherwise returns a new graph by applying `graph.subgraph_leaves`. Args: graph (DiGraphEx): graph describing the DAG @@ -142,7 +134,6 @@ def subgraph(graph: DiGraphEx, leaves_ids: Optional[List[Identifier]]) -> DiGrap Returns: DiGraphEx: The subgraph of the provided graph """ - # TODO: avoid mutable state, hence avoid doing deep copies ? # 0. deep copy the graph ids because it will be pruned during calculation graph = deepcopy(graph) diff --git a/tawazi/errors.py b/tawazi/errors.py index 181d52a1..3157e9dc 100644 --- a/tawazi/errors.py +++ b/tawazi/errors.py @@ -1,26 +1,41 @@ +"""Module for custom errors raised by Tawazi.""" from enum import Enum, unique from typing import Any, Callable, Union class TawaziBaseException(BaseException): + """BaseException of Tawazi from which all other exceptions inherit.""" + pass class TawaziArgumentException(TawaziBaseException): + """Raised when using Tawazi (Passing the wrong number/type of arguments etc.).""" + def __init__(self, func_name: str, arg_name: str) -> None: + """Initialize the ArgumentException. + + Args: + func_name (str): The corresponding function name. + arg_name (str): The corresponding argument name. + """ msg = f"Argument {arg_name} wasn't passed for the DAG" f" created from function {func_name}" super().__init__(msg) class TawaziTypeError(TawaziBaseException): + """Raised when using Tawazi (Passing the wrong type of arguments etc.).""" + pass class TawaziUsageError(TawaziBaseException): + """Raised when User miss uses Tawazi.""" + pass -def raise_arg_exc(func_or_func_name: Union[str, Callable[[Any], Any]], arg_name: str) -> None: +def _raise_arg_exc(func_or_func_name: Union[str, Callable[[Any], Any]], arg_name: str) -> None: if isinstance(func_or_func_name, str): raise TawaziArgumentException(func_or_func_name, arg_name) @@ -28,12 +43,16 @@ def raise_arg_exc(func_or_func_name: Union[str, Callable[[Any], Any]], arg_name: class InvalidExecNodeCall(TawaziBaseException): + """Raised when a ExecNode is called outside DAG definition (this will change in the future).""" + pass @unique class ErrorStrategy(str, Enum): + """The strategy to use when an error is raised inside a function in a DAG.""" + # supported behavior following a raised error - strict: str = "strict" - all_children: str = "all-children" - permissive: str = "permissive" + strict: str = "strict" # stop the execution of the whole DAG + all_children: str = "all-children" # stop the execution of the all successors + permissive: str = "permissive" # continue the execution of the whole DAG diff --git a/tawazi/helpers.py b/tawazi/helpers.py index 23dfe9d6..e4990dec 100644 --- a/tawazi/helpers.py +++ b/tawazi/helpers.py @@ -1,14 +1,15 @@ +"""Module for helper functions.""" import inspect from typing import Any, Callable, Dict, List, Tuple, Union import yaml from tawazi.consts import USE_SEP_END, USE_SEP_START, Identifier, NoVal, NoValType -from tawazi.errors import raise_arg_exc +from tawazi.errors import _raise_arg_exc def ordinal(numb: int) -> str: - """Construct the string corresponding to the ordinal of a number + """Construct the string corresponding to the ordinal of a number. Args: numb (int): order @@ -45,8 +46,7 @@ def ordinal(numb: int) -> str: def get_args_and_default_args(func: Callable[..., Any]) -> Tuple[List[str], Dict[str, Any]]: - """ - Retrieves the arguments names and the default arguments of a function. + """Retrieves the arguments names and the default arguments of a function. Args: func: the target function @@ -71,30 +71,30 @@ def get_args_and_default_args(func: Callable[..., Any]) -> Tuple[List[str], Dict return args, default_args -def make_raise_arg_error(func_name: str, arg_name: str) -> Callable[[], None]: +def _make_raise_arg_error(func_name: str, arg_name: str) -> Callable[[], None]: # declare a local function that will raise an error in the scheduler if # the user doesn't pass in This ArgExecNode as argument to the Attached LazyExecNode - return lambda: raise_arg_exc(func_name, arg_name) + return lambda: _raise_arg_exc(func_name, arg_name) -def lazy_xn_id(base_id: Identifier, count_usages: int) -> Identifier: +def _lazy_xn_id(base_id: Identifier, count_usages: int) -> Identifier: if count_usages > 0: return f"{base_id}{USE_SEP_START}{count_usages}{USE_SEP_END}" return base_id -def filter_NoVal(v: Union[NoValType, Any]) -> Any: +def _filter_noval(v: Union[NoValType, Any]) -> Any: if v is NoVal: return None return v # courtesy of https://gist.github.com/pypt/94d747fe5180851196eb?permalink_comment_id=3401011#gistcomment-3401011 -class UniqueKeyLoader(yaml.SafeLoader): +class _UniqueKeyLoader(yaml.SafeLoader): def construct_mapping(self, node: Any, deep: bool = False) -> Any: mapping = [] - for key_node, value_node in node.value: + for key_node, _value_node in node.value: key = self.construct_object(key_node, deep=deep) if key in mapping: raise KeyError(f"key {key} already in yaml file") diff --git a/tawazi/node.py b/tawazi/node.py index 2a42c231..7d2b2a76 100644 --- a/tawazi/node.py +++ b/tawazi/node.py @@ -1,3 +1,4 @@ +"""Module describing ExecNode Class and subclasses (The basic building Block of a DAG.""" from copy import copy from threading import Lock from types import MethodType @@ -22,7 +23,7 @@ from .config import Cfg from .errors import InvalidExecNodeCall, TawaziBaseException, TawaziTypeError -from .helpers import lazy_xn_id, make_raise_arg_error, ordinal +from .helpers import _lazy_xn_id, _make_raise_arg_error, ordinal # TODO: replace exec_nodes with dict # a temporary variable used to pass in exec_nodes to the DAG during building @@ -33,12 +34,11 @@ class ExecNode: - """ - This class is the base executable node of the Directed Acyclic Execution Graph. + """This class is the base executable node of the Directed Acyclic Execution Graph. + An ExecNode is an Object that can be executed inside a DAG scheduler. It basically consists of a function (exec_function) that takes *args and **kwargs and return a Value. - When the ExecNode is executed in the DAG, the resulting value will be stored in the ExecNode.result instance attribute - + When the ExecNode is executed in the DAG, the resulting value will be stored in the ExecNode.result instance attribute. """ def __init__( @@ -53,8 +53,7 @@ def __init__( tag: Tag = None, setup: bool = False, ): - """ - Constructor of ExecNode + """Constructor of ExecNode. Args: id_ (Identifier): identifier of ExecNode. @@ -108,14 +107,29 @@ def __init__( @property def executed(self) -> bool: + """Whether this ExecNode has been executed. + + Returns: + bool: whether this ExecNode has been executed. + """ return self.result is not NoVal def __repr__(self) -> str: + """Human representation of the ExecNode. + + Returns: + str: human representation of the ExecNode. + """ return f"{self.__class__.__name__} {self.id} ~ | <{hex(id(self))}>" # TODO: make cached_property ? @property def dependencies(self) -> List["ExecNode"]: + """The List of ExecNode dependencies of This ExecNode. + + Returns: + List[ExecNode]: the List of ExecNode dependencies of This ExecNode. + """ # Making the dependencies # 1. from args deps = self.args.copy() @@ -125,8 +139,7 @@ def dependencies(self) -> List["ExecNode"]: return deps def execute(self, node_dict: Dict[Identifier, "ExecNode"]) -> Optional[Any]: - """ - Execute the ExecNode inside of a DAG. + """Execute the ExecNode inside of a DAG. Args: node_dict (Dict[Identifier, ExecNode]): A shared dictionary containing the other ExecNodes in the DAG; @@ -169,6 +182,11 @@ def _assign_reserved_args(self, arg_name: str, value: Any) -> bool: @property def tag(self) -> Tag: + """The Tag of this ExecNode. + + Returns: + Tag: the Tag of this ExecNode. + """ return self._tag @tag.setter @@ -179,6 +197,11 @@ def tag(self, value: Tag) -> None: @property def priority(self) -> int: + """The priority of this ExecNode. + + Returns: + int: the priority of this ExecNode. + """ return self._priority @priority.setter @@ -189,11 +212,11 @@ def priority(self, value: int) -> None: class ArgExecNode(ExecNode): - """ - ExecNode corresponding to an Argument. + """ExecNode corresponding to an Argument. + Every Argument is Attached to a Function or an ExecNode (especially a LazyExecNode) If a value is not passed to the function call / ExecNode, - it will raise an error similar to Python's Error + it will raise an error similar to Python's Error. """ def __init__( @@ -202,8 +225,7 @@ def __init__( name_or_order: Union[str, int], value: Any = NoVal, ): - """ - Constructor of ArgExecNode + """Constructor of ArgExecNode. Args: xn_or_func_or_id (Union[ExecNode, Callable[..., Any], Identifier]): The ExecNode or function that this Argument is rattached to @@ -221,8 +243,8 @@ def __init__( # TODO: use pydantic! if isinstance(xn_or_func_or_id, ExecNode): base_id = xn_or_func_or_id.id - elif isinstance(xn_or_func_or_id, Callable): # type: ignore - base_id = xn_or_func_or_id.__qualname__ # type: ignore + elif callable(xn_or_func_or_id): + base_id = xn_or_func_or_id.__qualname__ elif isinstance(xn_or_func_or_id, Identifier): base_id = xn_or_func_or_id else: @@ -240,7 +262,7 @@ def __init__( id_ = f"{base_id}{ARG_NAME_SEP}{suffix}" - raise_err = make_raise_arg_error(base_id, suffix) + raise_err = _make_raise_arg_error(base_id, suffix) super().__init__(id_=id_, exec_function=raise_err, is_sequential=False) @@ -259,8 +281,8 @@ def __init__( # Hence ExecNode can return multiple values! # TODO: create a twz_deps reserved variable to support Nothing dependency class LazyExecNode(ExecNode, Generic[P, RVXN]): - """ - A lazy function simulator. + """A lazy function simulator. + The __call__ behavior of the original function is overridden to record the dependencies to build the DAG. The original function is kept to be called during the scheduling phase when calling the DAG. """ @@ -274,7 +296,7 @@ def __init__( tag: Any, setup: bool, ): - """Constructor of LazyExecNode + """Constructor of LazyExecNode. Args: func (Callable[..., Any]): Look at ExecNode's Documentation @@ -284,7 +306,6 @@ def __init__( tag (Any): Look at ExecNode's Documentation setup (bool): Look at ExecNode's Documentation """ - super().__init__( id_=func.__qualname__, exec_function=func, @@ -298,8 +319,7 @@ def __init__( def __call__( self, *args: P.args, **kwargs: P.kwargs ) -> RVXN: # in reality it returns "LazyExecNode": - """ - Record the dependencies in a global variable to be called later in DAG. + """Record the dependencies in a global variable to be called later in DAG. Args: *args (Any): positional arguments passed to the function during dependency recording @@ -331,7 +351,7 @@ def __call__( # 1.2 Assign the id count_usages = sum(ex_n.id.split(USE_SEP_START)[0] == self.id for ex_n in exec_nodes) # if ExecNode is used multiple times, <> is appended to its ID - self_copy.id = lazy_xn_id(self.id, count_usages) + self_copy.id = _lazy_xn_id(self.id, count_usages) # 2. Make the corresponding ExecNodes that corresponds to the Arguments # Make new objects because these should be different between different XN_calls @@ -379,11 +399,10 @@ def __call__( raise TawaziBaseException(f"setup node {self_copy} depends on non setup node {dep}") exec_nodes.append(self_copy) - return self_copy # type: ignore + return self_copy # type: ignore[return-value] def __get__(self, instance: "LazyExecNode[P, RVXN]", owner_cls: Optional[Any] = None) -> Any: - """ - Simulate func_descr_get() in Objects/funcobject.c + """Simulate func_descr_get() in Objects/funcobject.c. Args: instance (LazyExecNode): the instance that this attribute should be attached to @@ -405,48 +424,60 @@ def __get__(self, instance: "LazyExecNode[P, RVXN]", owner_cls: Optional[Any] = def get_return_ids(returned_exec_nodes: ReturnXNsType) -> ReturnIDsType: + """Get the IDs of the returned ExecNodes. + + Args: + returned_exec_nodes (ReturnXNsType): Aliases of the returned ExecNodes + + Raises: + TawaziTypeError: _description_ + TawaziTypeError: _description_ + TawaziTypeError: _description_ + + Returns: + ReturnIDsType: Corresponding IDs of the returned ExecNodes + """ # TODO: support iterators etc. err_string = ( "Return type of the pipeline must be either a Single Xnode," " Tuple of Xnodes, List of Xnodes, dict of Xnodes or None" ) - # 1 returned values can be of multiple nature - return_ids: ReturnIDsType = [] - # 2 No value returned by the execution + # 1 No value returned by the execution if returned_exec_nodes is None: - return_ids = None - # 3 a single value is returned - elif isinstance(returned_exec_nodes, ExecNode): - return_ids = returned_exec_nodes.id - # 4 multiple values returned - elif isinstance(returned_exec_nodes, (tuple, list)): - # 4.1 Collect all the return ids + return None + # 2 a single value is returned + if isinstance(returned_exec_nodes, ExecNode): + return returned_exec_nodes.id + # 3 multiple values returned + if isinstance(returned_exec_nodes, (tuple, list)): + return_ids: List[Identifier] = [] + # 3.1 Collect all the return ids for ren in returned_exec_nodes: if isinstance(ren, ExecNode): - return_ids.append(ren.id) # type: ignore + return_ids.append(ren.id) else: # NOTE: this error shouldn't ever raise during usage. # Please report in https://github.com/mindee/tawazi/issues raise TawaziTypeError(err_string) - # 4.2 Cast to the corresponding type + # 3.2 Cast to the corresponding type if isinstance(returned_exec_nodes, tuple): - return_ids = tuple(return_ids) # type: ignore - # 4.3 No Cast is necessary for the List because this is the default + return tuple(return_ids) + # otherwise return a list of the return ids + return return_ids + + # 3.3 No Cast is necessary for the List because this is the default # NOTE: this cast must be done when adding other types! - # 5 support dict - elif isinstance(returned_exec_nodes, dict): - return_ids = {} + # 4 support dict + if isinstance(returned_exec_nodes, dict): + return_ids_dict = {} for key, ren in returned_exec_nodes.items(): - # 5.1 key should be str and value should be an ExecNode generated by running an xnode... + # 4.1 key should be str and value should be an ExecNode generated by running an xnode... if isinstance(ren, ExecNode): - return_ids[key] = ren.id + return_ids_dict[key] = ren.id else: raise TawaziTypeError( f"return dict should only contain ExecNodes, but {ren} is of type {type(ren)}" ) - else: - raise TawaziTypeError( - f"{err_string}. Type of the provided return: {type(returned_exec_nodes)}" - ) - return return_ids + return return_ids_dict + raise TawaziTypeError(f"{err_string}. Type of the provided return: {type(returned_exec_nodes)}") diff --git a/tawazi/profile.py b/tawazi/profile.py index 33c8592f..4dd03577 100644 --- a/tawazi/profile.py +++ b/tawazi/profile.py @@ -1,3 +1,4 @@ +"""Module helper to profile execution time of Tawazi ExecNodes.""" from time import perf_counter, process_time, thread_time from typing import Any @@ -5,9 +6,10 @@ class Profile: + """Profile records execution time of Tawazi ExecNodes.""" + def __init__(self, active: bool = True) -> None: - """ - Profile an execution + """Profile an execution. Args: active (bool): Whether to activate the profile. Defaults to True. @@ -36,6 +38,11 @@ def __init__(self, active: bool = True) -> None: self.thread_exec_time = 0.0 def __enter__(self) -> "Profile": + """Context manager entry point. + + Returns: + Profile: self + """ if self.active: self.abs_exec_time = perf_counter() self.process_exec_time = process_time() @@ -44,6 +51,13 @@ def __enter__(self) -> "Profile": return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Context manager exit point. + + Args: + exc_type (Any): ... + exc_val (Any): ... + exc_tb (Any): ... + """ if self.active: self.abs_exec_time = perf_counter() - self.abs_exec_time self.process_exec_time = process_time() - self.process_exec_time @@ -51,21 +65,46 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: return def __repr__(self) -> str: + """Representation of the profile. + + Returns: + str: profiled execution times in human-readable format + """ return f"{self.abs_exec_time=}\n" f"{self.process_exec_time=}\n" f"{self.thread_exec_time=}" def __eq__(self, __o: object) -> bool: + """Equality operator. + + Args: + __o (object): The other Profile object. + + Raises: + TawaziTypeError: If the other object is not an instance of Profile. + + Returns: + bool: Whether both are equal. + """ if not isinstance(__o, Profile): raise TawaziTypeError(f"{__o} is not an instance of Profile") - eq = ( + return ( self.abs_exec_time == __o.abs_exec_time and self.process_exec_time == __o.process_exec_time and self.thread_exec_time == __o.thread_exec_time ) - return eq - def __lt__(self, __o: object) -> bool: + """Less than operator. + + Args: + __o (object): The other Profile object. + + Raises: + TawaziTypeError: if the other object is not an instance of Profile. + + Returns: + bool: Whether this Profile is less than the other. + """ if not isinstance(__o, Profile): raise TawaziTypeError(f"{__o} is not an instance of Profile") diff --git a/tests/test_behavior.py b/tests/test_behavior.py index 63eb04f0..9763bbe6 100644 --- a/tests/test_behavior.py +++ b/tests/test_behavior.py @@ -1,15 +1,11 @@ -# type: ignore +# type: ignore # noqa: PGH003 from time import sleep from typing import Any import pytest - from tawazi import DAG, ErrorStrategy from tawazi.node import ExecNode -"""Internal Unit tests""" - - T = 0.001 # global behavior_comp_str pytest.behavior_comp_str: str = "" diff --git a/tests/test_build.py b/tests/test_build.py index 7f482dcf..e4abfd8c 100644 --- a/tests/test_build.py +++ b/tests/test_build.py @@ -1,12 +1,9 @@ -# type: ignore -from time import sleep, time +from time import sleep from typing import Any from tawazi import DAG, ErrorStrategy from tawazi.node import ExecNode -"""Internal Unit Test""" - T = 0.1 @@ -46,8 +43,7 @@ def g(e: str) -> str: def fail(g: Any) -> int: - toto = 10 / 0 - return 1 + return 10 / 0 # ExecNodes can be identified using the actual function or an identification string @@ -67,11 +63,7 @@ def fail(g: Any) -> int: def test_dag_build() -> None: g: DAG[Any, Any] = DAG(list_execnodes, 2, behavior=ErrorStrategy.strict) - t0 = time() g._execute(g._make_subgraph()) # must never fail! - print(time() - t0) - for k, v in g.node_dict.items(): - print(g, v, v.result) def test_draw() -> None: diff --git a/tests/test_cache_feature.py b/tests/test_cache_feature.py index 0bb9846c..8ca57eaa 100644 --- a/tests/test_cache_feature.py +++ b/tests/test_cache_feature.py @@ -1,4 +1,4 @@ -# type: ignore +# type: ignore # noqa: PGH003 import os import pickle from pathlib import Path @@ -6,7 +6,6 @@ import numpy as np import pytest - from tawazi import DAGExecution, dag, xn from tawazi.consts import NoVal @@ -209,8 +208,8 @@ def test_cache_in_dpes() -> None: def validate(_cache_path, _cache_deps_of): cached_results = load_cached_results(_cache_path) - for xn in _cache_deps_of: - assert cached_results.get(xn.id) is None + for xn_ in _cache_deps_of: + assert cached_results.get(xn_.id) is None validate(cache_path, cache_deps_of) @@ -224,8 +223,8 @@ def validate(_cache_path, _cache_deps_of): def validate(_cache_path, _cache_deps_of): cached_results = load_cached_results(_cache_path) - for xn in _cache_deps_of: - assert cached_results.get(xn.id) is None + for xn_ in _cache_deps_of: + assert cached_results.get(xn_.id) is None validate(cache_path, cache_deps_of) @@ -240,8 +239,8 @@ def validate(_cache_path, _cache_deps_of): def validate(_cache_path: str, _cache_deps_of: List[Any]) -> None: cached_results = load_cached_results(_cache_path) - for xn in _cache_deps_of: - assert cached_results.get(xn.id) is None + for xn_ in _cache_deps_of: + assert cached_results.get(xn_.id) is None validate(cache_path, cache_deps_of) @@ -257,7 +256,7 @@ def validate(_cache_path: str, _cache_deps_of: List[Any]) -> None: def validate(_cache_path, _cache_deps_of): cached_results = load_cached_results(_cache_path) - for xn in _cache_deps_of: - assert cached_results.get(xn.id) is None + for xn_ in _cache_deps_of: + assert cached_results.get(xn_.id) is None validate(cache_path, cache_deps_of) diff --git a/tests/test_compound_priority.py b/tests/test_compound_priority.py index c7032574..f0043ff7 100644 --- a/tests/test_compound_priority.py +++ b/tests/test_compound_priority.py @@ -1,13 +1,10 @@ -# type: ignore +# type: ignore # noqa: PGH003 from time import sleep from typing import Any import pytest - from tawazi import dag, xn -"""Internal Unit Test""" - pytest.compound_priority_str: str = "" T = 1e-3 @@ -61,8 +58,8 @@ def test_compound_priority() -> None: assert dag.node_dict_by_name["e"].compound_priority == 1 -def test_compound_priority_call() -> None: - pytest.compound_priority_str == "" +def test_compound_priority_execution() -> None: + pytest.compound_priority_str = "" dependency_describer() assert pytest.compound_priority_str.startswith("ab") diff --git a/tests/test_config.py b/tests/test_config.py index c951a0cd..6d1d340f 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,7 +1,5 @@ from subprocess import run -"""Integration Test""" - def test_correct_env_var() -> None: # Real test diff --git a/tests/test_dag_config.py b/tests/test_dag_config.py index d1b787ef..769361d4 100644 --- a/tests/test_dag_config.py +++ b/tests/test_dag_config.py @@ -2,7 +2,6 @@ import pytest import yaml - from tawazi import dag, xn cfg = {"nodes": {"a": {"priority": 42, "is_sequential": False}}, "max_concurrency": 3} @@ -10,21 +9,18 @@ @xn(tag="toto") def a(cst: int) -> int: - print(cst) return cst @xn def b(a: int, cst: str) -> str: - print(a, cst) return str(a) + cst @dag def my_dag() -> str: var_a = a(1234) - var_b = b(var_a, "poulpe") - return var_b + return b(var_a, "poulpe") def test_config_from_dict() -> None: diff --git a/tests/test_dagexecutor.py b/tests/test_dagexecutor.py index bef5ee83..61af647a 100644 --- a/tests/test_dagexecutor.py +++ b/tests/test_dagexecutor.py @@ -1,9 +1,8 @@ -# type: ignore +# type: ignore # noqa: PGH003 import threading from typing import Tuple import pytest - from tawazi import dag, xn from tawazi.errors import TawaziUsageError @@ -99,7 +98,7 @@ def setop(bla=123): @xn(debug=True) def my_debug_node(in1, in2, in3): - print(in1, in2, in3) + print(in1, in2, in3) # noqa: T201 @dag def pipe(in1, in2): diff --git a/tests/test_debug_nodes.py b/tests/test_debug_nodes.py index acd27c99..f714f73e 100644 --- a/tests/test_debug_nodes.py +++ b/tests/test_debug_nodes.py @@ -1,15 +1,13 @@ -# type: ignore +# type: ignore # noqa: PGH003 from typing import Any, List import pytest - from tawazi import dag, xn from tawazi.errors import TawaziBaseException @xn def stub(img: List[Any]) -> List[Any]: - print(f"did operation on {img}") return img @@ -24,9 +22,9 @@ def my_len(img: List[Any]) -> int: def is_positive_len(len_img: int) -> None: # this node depends of the my_len! if len_img > 0: - print("positive") + print("positive") # noqa: T201 else: - print("negative") + print("negative") # noqa: T201 pytest.is_positive_len_has_ran = True @@ -40,11 +38,11 @@ def test_pipeline_with_debug_node() -> None: @dag def pipeline(img: List[Any]) -> List[Any]: img = stub(img) - len_ = my_len(img) + my_len(img) return img assert [1, 2, 3] == pipeline([1, 2, 3]) - assert pytest.my_len_has_ran == True + assert pytest.my_len_has_ran is True def test_pipeline_without_debug_node() -> None: @@ -56,11 +54,11 @@ def test_pipeline_without_debug_node() -> None: @dag def pipeline(img: List[Any]) -> List[Any]: img = stub(img) - len_ = my_len(img) + my_len(img) return img assert [1, 2, 3] == pipeline([1, 2, 3]) - assert pytest.my_len_has_ran == False + assert pytest.my_len_has_ran is False def test_interdependant_debug_nodes() -> None: @@ -78,8 +76,8 @@ def pipeline(img): return img assert [1, 2, 3] == pipeline([1, 2, 3]) - assert pytest.my_len_has_ran == True - assert pytest.is_positive_len_has_ran == True + assert pytest.my_len_has_ran is True + assert pytest.is_positive_len_has_ran is True def test_wrongly_defined_pipeline() -> None: @@ -123,7 +121,7 @@ def test_triple_incr_no_debug() -> None: tawazi.Cfg.RUN_DEBUG_NODES = False - assert triple_incr_debug(1) == None + assert triple_incr_debug(1) is None def test_triple_incr_debug_subgraph() -> None: @@ -150,8 +148,6 @@ def pipe(in1): assert pipe(2, target_nodes=["stub"]) == 2 assert pytest.prin_share_var == 3 - pytest.prin_share_var == None - assert pipe(0, target_nodes=["stub", "incr"]) == 0 assert pytest.prin_share_var == 1 diff --git a/tests/test_decorator.py b/tests/test_decorator.py index ba122e23..5622c3b6 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -1,11 +1,9 @@ from functools import wraps from typing import Callable, TypeVar -from typing_extensions import ParamSpec - from tawazi import dag, xn +from typing_extensions import ParamSpec -"""Integration test""" P = ParamSpec("P") RV = TypeVar("RV") @@ -13,9 +11,9 @@ def my_little_logger(func: Callable[P, RV]) -> Callable[P, RV]: @wraps(func) def log(*args: P.args, **kwargs: P.kwargs) -> RV: - print("this should print before execution") + print("this should print before execution") # noqa: T201 res = func(*args, **kwargs) - print("this should print after execution") + print("this should print after execution") # noqa: T201 return res return log @@ -36,7 +34,7 @@ def b(a: str) -> str: @dag def pipe() -> None: a_ = a() - t = b(a_) + b(a_) def test_decorator() -> None: diff --git a/tests/test_edge_cases_dag.py b/tests/test_edge_cases_dag.py index f6c811ef..6f6182f7 100644 --- a/tests/test_edge_cases_dag.py +++ b/tests/test_edge_cases_dag.py @@ -1,23 +1,19 @@ from tawazi import dag, xn -"""integration test""" - def test_same_constant_name_in_two_exec_nodes() -> None: @xn def a(cst: int) -> int: - print(cst) return cst @xn def b(a: int, cst: str) -> str: - print(a, cst) return str(a) + cst @dag def my_dag() -> None: var_a = a(1234) - var_b = b(var_a, "poulpe") + b(var_a, "poulpe") exec_nodes = my_dag._execute(my_dag._make_subgraph()) assert len(exec_nodes) == 4 diff --git a/tests/test_error_during_dag_build.py b/tests/test_error_during_dag_build.py index da1e37e1..fc126718 100644 --- a/tests/test_error_during_dag_build.py +++ b/tests/test_error_during_dag_build.py @@ -1,5 +1,4 @@ import pytest - from tawazi import dag, node, xn @@ -13,7 +12,7 @@ def a() -> None: @dag def pipe() -> None: a() - # an undefined ExecNode - b() # type: ignore[name-defined] + # purposefully an undefined ExecNode + b() # type: ignore[name-defined] # noqa: F821 assert node.exec_nodes == [] diff --git a/tests/test_exclude_nodes.py b/tests/test_exclude_nodes.py index a098f9b9..546f356f 100644 --- a/tests/test_exclude_nodes.py +++ b/tests/test_exclude_nodes.py @@ -1,9 +1,8 @@ -# type: ignore +# type: ignore # noqa: PGH003 from copy import deepcopy from typing import Tuple import pytest - from tawazi import dag, xn from tawazi.config import Cfg from tawazi.errors import TawaziUsageError diff --git a/tests/test_execution_time.py b/tests/test_execution_time.py index 5420b5e9..ca308476 100644 --- a/tests/test_execution_time.py +++ b/tests/test_execution_time.py @@ -3,8 +3,6 @@ from tawazi import dag, xn -"""integration test""" - T = 0.1 @@ -27,7 +25,7 @@ def c(a: Any, b: Any) -> None: def deps() -> None: a_ = a() b_ = b() - c_ = c(a_, b_) + c(a_, b_) def test_timing() -> None: diff --git a/tests/test_graph.py b/tests/test_graph.py index 49b75c73..decfc33d 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -1,13 +1,9 @@ from time import sleep -from typing import Any from networkx import NetworkXUnfeasible - from tawazi import DAG, ErrorStrategy from tawazi.node import ExecNode -"""unit test""" - T = 0.1 @@ -36,41 +32,38 @@ def c(b: str) -> str: def test_circular_deps() -> None: try: - g: DAG[Any, Any] = DAG(list_exec_nodes, 2, behavior=ErrorStrategy.strict) + DAG(list_exec_nodes, 2, behavior=ErrorStrategy.strict) except NetworkXUnfeasible: pass -""" -@op -def n1(img): - print(len(img)) - return len(img) - -@op -def n2(n: int): - print("the length is ", n) - return n +# @op +# def n1(img): +# print(len(img)) +# return len(img) -@to_dag -def pipeline(img): +# @op +# def n2(n: int): +# print("the length is ", n) +# return n - # img ? - _len = n1(img) - n = n2(_len) +# @to_dag +# def pipeline(img): - # ? - return n +# # img ? +# _len = n1(img) +# n = n2(_len) -# this is the best option! -pipeline(img) +# # ? +# return n -# second option if 1st isn't possible -my_dag = pipeline.make_dag() +# # this is the best option! +# pipeline(img) +# # second option if 1st isn't possible +# my_dag = pipeline.make_dag() -def autre_fonction(img: List[int]): - # called via Product.run() - returned_value = my_dag(img) -""" +# def autre_fonction(img: List[int]): +# # called via Product.run() +# returned_value = my_dag(img) diff --git a/tests/test_imbricated_dags.py b/tests/test_imbricated_dags.py index 3a5046c4..751dd944 100644 --- a/tests/test_imbricated_dags.py +++ b/tests/test_imbricated_dags.py @@ -15,8 +15,7 @@ def op2(op1: int) -> int: @xn @dag def op3(img: List[int]) -> int: - toto = op2(op1(img)) - return toto + return op2(op1(img)) @xn def op4(img: List[int]) -> int: @@ -33,4 +32,4 @@ def op6(img: List[int]) -> float: return op5(titi, toto) - pipe = op6([1, 2, 3, 4]) + op6([1, 2, 3, 4]) diff --git a/tests/test_methods.py b/tests/test_methods.py index e6bb8ce6..cdce57f6 100644 --- a/tests/test_methods.py +++ b/tests/test_methods.py @@ -2,10 +2,7 @@ from typing import Union import pytest - -from tawazi import dag, xn - -"""integration test""" +from tawazi import xn logger = Logger(name="mylogger", level="ERROR") @@ -62,15 +59,13 @@ def d( # d1.execute() -""" -The use case for this feature is the following: +# The use case for this feature is the following: -The user has a Class with an instance that contains a lof of methods and attributes. -This class contains a very complicated function that can benefit from parallelization. -The user wants to run the dag multiple times and get the same results... - he is responsible for the modifications that any function might do on the instance... - He should be warned about the parallelization dangers when dealing with shared data (in this case self!) -The user (in the best case scenario) will only exchange mutable data between the methods via the parameters -This will ensure the dependency of execution will be respected! -""" +# The user has a Class with an instance that contains a lof of methods and attributes. +# This class contains a very complicated function that can benefit from parallelization. +# The user wants to run the dag multiple times and get the same results... +# he is responsible for the modifications that any function might do on the instance... +# He should be warned about the parallelization dangers when dealing with shared data (in this case self!) +# The user (in the best case scenario) will only exchange mutable data between the methods via the parameters +# This will ensure the dependency of execution will be respected! # TODO: do some advanced tests on this case! diff --git a/tests/test_multiple_return_value_for_graph.py b/tests/test_multiple_return_value_for_graph.py index f43ce3a2..c3701c07 100644 --- a/tests/test_multiple_return_value_for_graph.py +++ b/tests/test_multiple_return_value_for_graph.py @@ -1,7 +1,6 @@ from typing import Dict, List, Tuple, TypeVar, Union import pytest - from tawazi import dag, xn from tawazi.errors import TawaziTypeError @@ -18,7 +17,7 @@ def test_no_return() -> None: def pipe() -> None: return - assert pipe() == None + assert pipe() is None def test_return_single() -> None: diff --git a/tests/test_non_picklable_arguments.py b/tests/test_non_picklable_arguments.py index 16cb8dc3..15010cb4 100644 --- a/tests/test_non_picklable_arguments.py +++ b/tests/test_non_picklable_arguments.py @@ -1,5 +1,5 @@ import os -from typing import Any, Tuple, Union +from typing import Any, Tuple from tawazi import dag, xn diff --git a/tests/test_op.py b/tests/test_op.py index bc5aa1f6..9fa41132 100644 --- a/tests/test_op.py +++ b/tests/test_op.py @@ -1,12 +1,9 @@ from typing import Any import pytest - from tawazi import dag, xn from tawazi.errors import InvalidExecNodeCall -"""integration test""" - # tests different cases of @op decoration for Python functions # 1. different signatures diff --git a/tests/test_ops_interface.py b/tests/test_ops_interface.py index c0190a22..82b7afbb 100644 --- a/tests/test_ops_interface.py +++ b/tests/test_ops_interface.py @@ -2,11 +2,8 @@ from typing import Union import pytest - from tawazi import dag, xn -"""integration test""" - logger = Logger(name="mylogger", level="ERROR") diff --git a/tests/test_pipeline_input_output.py b/tests/test_pipeline_input_output.py index e84a26b3..d9b7f0be 100644 --- a/tests/test_pipeline_input_output.py +++ b/tests/test_pipeline_input_output.py @@ -1,8 +1,6 @@ -# type: ignore from typing import Any, List, Tuple import pytest - from tawazi import dag, xn from tawazi.errors import TawaziArgumentException, TawaziBaseException @@ -14,7 +12,7 @@ def a(input_img: List[int], cst: int) -> int: @xn def lazy_print(*args: Any) -> None: - print(*args) + print(*args) # noqa: T201 @dag diff --git a/tests/test_priority_sequential.py b/tests/test_priority_sequential.py index f6f39e15..67d01e84 100644 --- a/tests/test_priority_sequential.py +++ b/tests/test_priority_sequential.py @@ -1,14 +1,11 @@ -# type: ignore +# type: ignore # noqa: PGH003 from time import sleep from typing import Any import pytest - from tawazi import DAG, ErrorStrategy from tawazi.node import ExecNode -"""internal Unit test""" - T = 0.01 # global priority_sequential_comp_str pytest.priority_sequential_comp_str = "" diff --git a/tests/test_profiling.py b/tests/test_profiling.py index 61fa50bd..093824c6 100644 --- a/tests/test_profiling.py +++ b/tests/test_profiling.py @@ -1,16 +1,10 @@ -import math -import os from copy import deepcopy -from numbers import Real from time import sleep, time from typing import Any, Tuple import numpy as np -import pytest - import tawazi from tawazi import dag, xn -from tawazi.errors import TawaziBaseException # T = TypeVar("T", bound=Real) diff --git a/tests/test_safe_execution.py b/tests/test_safe_execution.py index 63d26ac9..b6e024f4 100644 --- a/tests/test_safe_execution.py +++ b/tests/test_safe_execution.py @@ -1,14 +1,11 @@ -# type: ignore +# type: ignore # noqa: PGH003 from copy import deepcopy from time import sleep from typing import Tuple import pytest - from tawazi import dag, xn -"""integration tests""" - T = 0.01 @@ -33,7 +30,7 @@ def c(a: str, b: str) -> str: def run_without_dag() -> None: a_ = a() b_ = b() - c_ = c(a_, b_) + c(a_, b_) # apply the operation decorator @@ -48,7 +45,7 @@ def run_without_dag() -> None: def dagger() -> None: a_ = a_op() b_ = b_op() - c_ = c_op(a_, b_) + c_op(a_, b_) def test_normal_execution_without_dag() -> None: @@ -119,8 +116,8 @@ def test_subgraph_with_safe_execution_with_setup() -> None: pytest.safe_execution_op_cst_has_run = False assert pipe_._safe_execute(target_nodes=["op1"]) == (5, None) assert pytest.safe_execution_c == 1 - assert pytest.safe_execution_op_cst_has_run == False + assert pytest.safe_execution_op_cst_has_run is False assert pipe_._safe_execute(target_nodes=["op1"]) == (5, None) assert pytest.safe_execution_c == 1 - assert pytest.safe_execution_op_cst_has_run == False + assert pytest.safe_execution_op_cst_has_run is False diff --git a/tests/test_setup_nodes.py b/tests/test_setup_nodes.py index 4e4510b7..31cf6bc8 100644 --- a/tests/test_setup_nodes.py +++ b/tests/test_setup_nodes.py @@ -1,9 +1,8 @@ -# type: ignore +# type: ignore # noqa: PGH003 from copy import deepcopy from functools import reduce import pytest - from tawazi import dag, xn from tawazi.errors import TawaziBaseException, TawaziUsageError @@ -18,7 +17,6 @@ def setup_op(in1): @xn def op1(a_str: str): - print("op1", a_str) pytest.op1_counter += 1 return len(a_str) @@ -115,7 +113,7 @@ def pipe_setup_deps(): sop2_r = setup_op2(sop1_r) op1_r = op1(sop1_r) op2_r = op2(sop2_r) - op12_r = op12(op1_r, op2_r) + op12(op1_r, op2_r) pytest.setup_op1 = 0 pytest.setup_op2 = 0 @@ -170,8 +168,7 @@ def pipe_setup_deps(): sop2_r = setup_op2(sop1_r) op1_r = op1(sop1_r, twz_tag="twinkle toes") op2_r = op2(sop2_r) - op12_r = op12(op1_r, op2_r) - return op12_r + return op12(op1_r, op2_r) pytest.setup_op1 = 0 pytest.setup_op2 = 0 @@ -235,8 +232,7 @@ def pipe_setup_deps(): sop2_r = setup_op2(sop1_r, twz_tag="setup2") op1_r = op1(sop1_r, twz_tag="twinkle toes") op2_r = op2(sop2_r) - op12_r = op12(op1_r, op2_r) - return op12_r + return op12(op1_r, op2_r) # test runninig setup without arguments pipe = deepcopy(pipe_setup_deps) @@ -255,7 +251,7 @@ def pipe_setup_deps(): assert pytest.op2 == 1 assert pytest.op12 == 1 - # test running setup targetting a setup node + # test running setup targeting a setup node pipe = deepcopy(pipe_setup_deps) clean() pipe.setup(target_nodes=["setup1"]) @@ -287,7 +283,7 @@ def pipe_setup_deps(): assert pytest.op2 == 1 assert pytest.op12 == 1 - # test running setup targetting a non setup node + # test running setup targeting a non setup node pipe = deepcopy(pipe_setup_deps) clean() pipe.setup(target_nodes=["twinkle toes"]) diff --git a/tests/test_subgraph.py b/tests/test_subgraph.py index 686297d5..d4921480 100644 --- a/tests/test_subgraph.py +++ b/tests/test_subgraph.py @@ -1,14 +1,10 @@ -# type: ignore +# type: ignore # noqa: PGH003 from typing import Any, List import pytest - from tawazi import dag, xn -from tawazi.errors import TawaziBaseException from tawazi.node import ExecNode -"""integration test""" - pytest.subgraph_comp_str = "" T = 1e-3 @@ -61,16 +57,16 @@ def i(h: Any) -> None: @dag def dag_describer() -> None: var_a = a() - var_b = b(var_a) + b(var_a) var_c = c(var_a) - var_d = d(var_c) + d(var_c) var_e = e(var_c) - var_f = f(var_e) + f(var_e) - var_g = g() + g() var_h = h() - var_i = i(var_h) + i(var_h) def test_scheduled_nodes() -> None: @@ -94,7 +90,7 @@ def test_dag_subgraph_all_nodes() -> None: nodes_ids = [n.id for n in nodes] graph = dag._make_subgraph(nodes_ids) - results = dag._execute(graph) + dag._execute(graph) assert set("abcdefghi") == set(pytest.subgraph_comp_str) @@ -105,7 +101,7 @@ def test_dag_subgraph_leaf_nodes() -> None: nodes_ids: List[ExecNode] = [n.id for n in nodes] graph = dag._make_subgraph(nodes_ids) - results = dag._execute(graph) + dag._execute(graph) assert set("abcdefghi") == set(pytest.subgraph_comp_str) @@ -116,7 +112,7 @@ def test_dag_subgraph_leaf_nodes_with_extra_nodes() -> None: nodes_ids = [n.id for n in nodes] graph = dag._make_subgraph(nodes_ids) - results = dag._execute(graph) + dag._execute(graph) assert set("abcegh") == set(pytest.subgraph_comp_str) @@ -124,7 +120,7 @@ def test_dag_subgraph_nodes_ids() -> None: pytest.subgraph_comp_str = "" dag = dag_describer graph = dag._make_subgraph([b.id, c.id, e.id, h.id, g.id]) - results = dag._execute(graph) + dag._execute(graph) assert set("abcegh") == set(pytest.subgraph_comp_str) @@ -132,7 +128,7 @@ def test_dag_subgraph_non_existing_nodes_ids() -> None: with pytest.raises(ValueError, match="(node or tag gibirish not found)(.|\n)*"): dag = dag_describer graph = dag._make_subgraph(["gibirish"]) - results = dag._execute(graph) + dag._execute(graph) @xn diff --git a/tests/test_tags.py b/tests/test_tags.py index b703bb43..1d7e3121 100644 --- a/tests/test_tags.py +++ b/tests/test_tags.py @@ -1,4 +1,4 @@ -# type: ignore +# type: ignore # noqa: PGH003 from tawazi import dag, xn @@ -27,8 +27,7 @@ def test_call_tag() -> None: @dag def pipe() -> int: a_ = a(10, twz_tag="another_a_tag") - b_ = b(a_, twz_tag="another_b_tag") - return b_ + return b(a_, twz_tag="another_b_tag") assert pipe() == 12 assert pipe.get_nodes_by_tag("another_a_tag") == [pipe.get_node_by_id("a")] diff --git a/tests/test_typing.py b/tests/test_typing.py index 0d82f277..55826164 100644 --- a/tests/test_typing.py +++ b/tests/test_typing.py @@ -4,7 +4,6 @@ from typing import Tuple import pytest - from tawazi import dag, xn from tawazi.errors import ErrorStrategy diff --git a/tests/test_wrapped_function.py b/tests/test_wrapped_function.py index 05519703..c5f49cc7 100644 --- a/tests/test_wrapped_function.py +++ b/tests/test_wrapped_function.py @@ -6,7 +6,6 @@ @xn def abcd(i: int, b: List[str], cst: float = 0.1) -> int: """doc of a""" - print(i, b, cst) return i @@ -16,8 +15,7 @@ def pipe(entry: int) -> int: # TODO: this should not work but it actually works even though the arguments are not complete!! # b = abcd(entry, entry) - b = abcd(entry, ["entry"]) - return b + return abcd(entry, ["entry"]) def test_doc_pipeline() -> None: