From 7715c3f1aad46798bc8b07556acfd7395e473f01 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 1 May 2024 15:51:33 +0100 Subject: [PATCH] Improve performance by 22-35% or more by caching partial parse artefact (#904) Improve the performance to run the benchmark DAG with 100 tasks by 34% and the benchmark DAG with 10 tasks by 22%, by persisting the dbt partial parse artifact in Airflow nodes. This performance can be even higher in the case of dbt projects that take more time to be parsed. With the introduction of #800, Cosmos supports using dbt partial parsing files. This feature has led to a substantial performance improvement, particularly for large dbt projects, both during Airflow DAG parsing (using LoadMode.DBT_LS) and also Airflow task execution (when using `ExecutionMode.LOCAL` and `ExecutionMode.VIRTUALENV`). There were two limitations with the initial support to partial parsing, which the current PR aims to address: 1. DAGs using Cosmos `ProfileMapping` classes could not leverage this feature. This is because the partial parsing relies on profile files not changing, and by default, Cosmos would mock the dbt profile in several parts of the code. The consequence is that users trying Cosmos 1.4.0a1 will see the following message: ``` 13:33:16 Unable to do partial parsing because profile has changed 13:33:16 Unable to do partial parsing because env vars used in profiles.yml have changed ``` 2. The user had to explicitly provide a `partial_parse.msgpack` file in the original project folder for their Airflow deployment - and if, for any reason, this became outdated, the user would not leverage the partial parsing feature. Since Cosmos runs dbt tasks from within a temporary directory, the partial parse would be stale for some users, it would be updated in the temporary directory, but the next time the task was run, Cosmos/dbt would not leverage the recently updated `partial_parse.msgpack` file. The current PR addresses these two issues respectfully by: 1. Allowing users that want to leverage Cosmos `ProfileMapping` and partial parsing to use `RenderConfig(enable_mock_profile=False)` 2. Introducing a Cosmos cache directory where we are persisting partial parsing files. This feature is enabled by default, but users can opt out by setting the Airflow configuration `[cosmos][enable_cache] = False` (exporting the environment variable `AIRFLOW__COSMOS__ENABLE_CACHE=0`). Users can also define the temporary directory used to store these files using the `[cosmos][cache_dir]` Airflow configuration. By default, Cosmos will create and use a folder `cosmos` inside the system's temporary directory: https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir . This PR affects both DAG parsing and task execution. Although it does not introduce an optimisation per se, it makes the partial parse feature implemented #800 available to more users. Closes: #722 I updated the documentation in the PR: #898 Some future steps related to optimization associated to caching to be addressed in separate PRs: i. Change how we create mocked profiles, to create the file itself in the same way, referencing an environment variable with the same name - and only changing the value of the environment variable (#924) ii. Extend caching to the `profiles.yml` created by Cosmos in the newly introduced `tmp/cosmos` without the need to recreate it every time (#925). iii. Extend caching to the Airflow DAG/Task group as a pickle file - this approach is more generic and would work for every type of DAG parsing and executor. (#926) iv. Support persisting/fetching the cache from remote storage so we don't have to replicate it for every Airflow scheduler and worker node. (#927) v. Cache dbt deps lock file/avoid installing dbt steps every time. We can leverage `package-lock.yml` introduced in dbt t 1.7 (https://docs.getdbt.com/reference/commands/deps#predictable-package-installs), but ideally, we'd have a strategy to support older versions of dbt as well. (#930) vi. Support caching `partial_parse.msgpack` even when vars change: https://medium.com/@sebastian.daum89/how-to-speed-up-single-dbt-invocations-when-using-changing-dbt-variables-b9d91ce3fb0d vii. Support partial parsing in Docker and Kubernetes Cosmos executors (#929) viii. Centralise all the Airflow-based config into Cosmos settings.py & create a dedicated docs page containing information about these (#928) **How to validate this change** Run the performance benchmark against this and the `main` branch, checking the value of `/tmp/performance_results.txt`. Example of commands run locally: ``` # Setup AIRFLOW_HOME=`pwd` AIRFLOW_CONN_AIRFLOW_DB="postgres://postgres:postgres@0.0.0.0:5432/postgres" PYTHONPATH=`pwd` AIRFLOW_HOME=`pwd` AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=20000 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=20000 hatch run tests.py3.11-2.7:test-performance-setup # Run test for 100 dbt models per DAG: MODEL_COUNT=100 AIRFLOW_HOME=`pwd` AIRFLOW_CONN_AIRFLOW_DB="postgres://postgres:postgres@0.0.0.0:5432/postgres" PYTHONPATH=`pwd` AIRFLOW_HOME=`pwd` AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=20000 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=20000 hatch run tests.py3.11-2.7:test-performance ``` An example of output when running 100 with the main branch: ``` NUM_MODELS=100 TIME=114.18614888191223 MODELS_PER_SECOND=0.8757629623135543 DBT_VERSION=1.7.13 ``` And with the current PR: ``` NUM_MODELS=100 TIME=75.17766404151917 MODELS_PER_SECOND=1.33018232576064 DBT_VERSION=1.7.13 ``` --- cosmos/cache.py | 124 ++++++++++++++++++++++++++++ cosmos/config.py | 6 +- cosmos/constants.py | 2 + cosmos/converter.py | 19 ++--- cosmos/dbt/graph.py | 29 +++++-- cosmos/dbt/project.py | 13 ++- cosmos/operators/base.py | 6 ++ cosmos/operators/local.py | 26 ++++-- cosmos/settings.py | 11 +++ dev/dags/basic_cosmos_task_group.py | 11 ++- dev/dags/cosmos_profile_mapping.py | 6 +- pyproject.toml | 2 +- tests/dbt/test_graph.py | 43 +++++++++- tests/dbt/test_project.py | 26 +----- tests/operators/test_local.py | 30 +++++++ tests/operators/test_virtualenv.py | 3 + tests/test_cache.py | 66 +++++++++++++++ tests/test_converter.py | 73 ++++++++++++++++ 18 files changed, 426 insertions(+), 70 deletions(-) create mode 100644 cosmos/cache.py create mode 100644 cosmos/settings.py create mode 100644 tests/test_cache.py diff --git a/cosmos/cache.py b/cosmos/cache.py new file mode 100644 index 000000000..3c2086c7a --- /dev/null +++ b/cosmos/cache.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +import shutil +from pathlib import Path + +from airflow.models.dag import DAG +from airflow.utils.task_group import TaskGroup + +from cosmos import settings +from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME +from cosmos.dbt.project import get_partial_parse_path + + +# It was considered to create a cache identifier based on the dbt project path, as opposed +# to where it is used in Airflow. However, we could have concurrency issues if the same +# dbt cached directory was being used by different dbt task groups or DAGs within the same +# node. For this reason, as a starting point, the cache is identified by where it is used. +# This can be reviewed in the future. +def _create_cache_identifier(dag: DAG, task_group: TaskGroup | None) -> str: + """ + Given a DAG name and a (optional) task_group_name, create the identifier for caching. + + :param dag_name: Name of the Cosmos DbtDag being cached + :param task_group_name: (optional) Name of the Cosmos DbtTaskGroup being cached + :return: Unique identifier representing the cache + """ + if task_group: + if task_group.dag_id is not None: + cache_identifiers_list = [task_group.dag_id] + if task_group.group_id is not None: + cache_identifiers_list.extend([task_group.group_id.replace(".", "__")]) + cache_identifier = "__".join(cache_identifiers_list) + else: + cache_identifier = dag.dag_id + + return cache_identifier + + +def _obtain_cache_dir_path(cache_identifier: str, base_dir: Path = settings.cache_dir) -> Path: + """ + Return a directory used to cache a specific Cosmos DbtDag or DbtTaskGroup. If the directory + does not exist, create it. + + :param cache_identifier: Unique key used as a cache identifier + :param base_dir: Root directory where cache will be stored + :return: Path to directory used to cache this specific Cosmos DbtDag or DbtTaskGroup + """ + cache_dir_path = base_dir / cache_identifier + tmp_target_dir = cache_dir_path / DBT_TARGET_DIR_NAME + tmp_target_dir.mkdir(parents=True, exist_ok=True) + return cache_dir_path + + +def _get_timestamp(path: Path) -> float: + """ + Return the timestamp of a path or 0, if it does not exist. + + :param path: Path to the file or directory of interest + :return: File or directory timestamp + """ + try: + timestamp = path.stat().st_mtime + except FileNotFoundError: + timestamp = 0 + return timestamp + + +def _get_latest_partial_parse(dbt_project_path: Path, cache_dir: Path) -> Path | None: + """ + Return the path to the latest partial parse file, if defined. + + :param dbt_project_path: Original dbt project path + :param cache_dir: Path to the Cosmos project cache directory + :return: Either return the Path to the latest partial parse file, or None. + """ + project_partial_parse_path = get_partial_parse_path(dbt_project_path) + cosmos_cached_partial_parse_filepath = get_partial_parse_path(cache_dir) + + age_project_partial_parse = _get_timestamp(project_partial_parse_path) + age_cosmos_cached_partial_parse_filepath = _get_timestamp(cosmos_cached_partial_parse_filepath) + + if age_project_partial_parse and age_cosmos_cached_partial_parse_filepath: + if age_project_partial_parse > age_cosmos_cached_partial_parse_filepath: + return project_partial_parse_path + else: + return cosmos_cached_partial_parse_filepath + elif age_project_partial_parse: + return project_partial_parse_path + elif age_cosmos_cached_partial_parse_filepath: + return cosmos_cached_partial_parse_filepath + + return None + + +def _update_partial_parse_cache(latest_partial_parse_filepath: Path, cache_dir: Path) -> None: + """ + Update the cache to have the latest partial parse file contents. + + :param latest_partial_parse_filepath: Path to the most up-to-date partial parse file + :param cache_dir: Path to the Cosmos project cache directory + """ + cache_path = get_partial_parse_path(cache_dir) + manifest_path = get_partial_parse_path(cache_dir).parent / DBT_MANIFEST_FILE_NAME + latest_manifest_filepath = latest_partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME + + shutil.copy(str(latest_partial_parse_filepath), str(cache_path)) + shutil.copy(str(latest_manifest_filepath), str(manifest_path)) + + +def _copy_partial_parse_to_project(partial_parse_filepath: Path, project_path: Path) -> None: + """ + Update target dbt project directory to have the latest partial parse file contents. + + :param partial_parse_filepath: Path to the most up-to-date partial parse file + :param project_path: Path to the target dbt project directory + """ + target_partial_parse_file = get_partial_parse_path(project_path) + tmp_target_dir = project_path / DBT_TARGET_DIR_NAME + tmp_target_dir.mkdir(exist_ok=True) + + source_manifest_filepath = partial_parse_filepath.parent / DBT_MANIFEST_FILE_NAME + target_manifest_filepath = target_partial_parse_file.parent / DBT_MANIFEST_FILE_NAME + shutil.copy(str(partial_parse_filepath), str(target_partial_parse_file)) + shutil.copy(str(source_manifest_filepath), str(target_manifest_filepath)) diff --git a/cosmos/config.py b/cosmos/config.py index 729e95c75..64a7acd08 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -54,6 +54,7 @@ class RenderConfig: :param env_vars: (Deprecated since Cosmos 1.3 use ProjectConfig.env_vars) A dictionary of environment variables for rendering. Only supported when using ``LoadMode.DBT_LS``. :param dbt_project_path: Configures the DBT project location accessible on the airflow controller for DAG rendering. Mutually Exclusive with ProjectConfig.dbt_project_path. Required when using ``load_method=LoadMode.DBT_LS`` or ``load_method=LoadMode.CUSTOM``. :param dbt_ls_path: Configures the location of an output of ``dbt ls``. Required when using ``load_method=LoadMode.DBT_LS_FILE``. + :param enable_mock_profile: Allows to enable/disable mocking profile. Enabled by default. Mock profiles are useful for parsing Cosmos DAGs in the CI, but should be disabled to benefit from partial parsing (since Cosmos 1.4). """ emit_datasets: bool = True @@ -68,8 +69,8 @@ class RenderConfig: env_vars: dict[str, str] | None = None dbt_project_path: InitVar[str | Path | None] = None dbt_ls_path: Path | None = None - project_path: Path | None = field(init=False) + enable_mock_profile: bool = True def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: @@ -288,7 +289,8 @@ def ensure_profile( with tempfile.TemporaryDirectory() as temp_dir: temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME logger.info( - "Creating temporary profiles.yml at %s with the following contents:\n%s", + "Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s", + use_mock_values, temp_file, profile_contents, ) diff --git a/cosmos/constants.py b/cosmos/constants.py index 1db78d15b..bea5e25eb 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -7,11 +7,13 @@ DBT_PROFILE_PATH = Path(os.path.expanduser("~")).joinpath(".dbt/profiles.yml") DEFAULT_DBT_PROFILE_NAME = "cosmos_profile" DEFAULT_DBT_TARGET_NAME = "cosmos_target" +DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos" DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH" DBT_LOG_DIR_NAME = "logs" DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH" DBT_TARGET_DIR_NAME = "target" DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack" +DBT_MANIFEST_FILE_NAME = "manifest.json" DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" diff --git a/cosmos/converter.py b/cosmos/converter.py index f9511ab82..08a44b676 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -11,6 +11,7 @@ from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup +from cosmos import cache, settings from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ExecutionMode @@ -214,8 +215,6 @@ def __init__( validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args) - # If we are using the old interface, we should migrate it to the new interface - # This is safe to do now since we have validated which config interface we're using if project_config.dbt_project_path: execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config) @@ -224,21 +223,16 @@ def __init__( env_vars = project_config.env_vars or operator_args.get("env") dbt_vars = project_config.dbt_vars or operator_args.get("vars") - # Previously, we were creating a cosmos.dbt.project.DbtProject - # DbtProject has now been replaced with ProjectConfig directly - # since the interface of the two classes were effectively the same - # Under this previous implementation, we were passing: - # - name, root dir, models dir, snapshots dir and manifest path - # Internally in the dbtProject class, we were defaulting the profile_path - # To be root dir/profiles.yml - # To keep this logic working, if converter is given no ProfileConfig, - # we can create a default retaining this value to preserve this functionality. - # We may want to consider defaulting this value in our actual ProjceConfig class? + cache_dir = None + if settings.enable_cache: + cache_dir = cache._obtain_cache_dir_path(cache_identifier=cache._create_cache_identifier(dag, task_group)) + self.dbt_graph = DbtGraph( project=project_config, render_config=render_config, execution_config=execution_config, profile_config=profile_config, + cache_dir=cache_dir, dbt_vars=dbt_vars, ) self.dbt_graph.load(method=render_config.load_method, execution_mode=execution_config.execution_mode) @@ -251,6 +245,7 @@ def __init__( "emit_datasets": render_config.emit_datasets, "env": env_vars, "vars": dbt_vars, + "cache_dir": cache_dir, } if execution_config.dbt_executable_path: task_args["dbt_executable_path"] = execution_config.dbt_executable_path diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 160738fb5..09c00c6d1 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -11,6 +11,7 @@ import yaml +from cosmos import cache from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, @@ -23,7 +24,7 @@ LoadMode, ) from cosmos.dbt.parser.project import LegacyDbtProject -from cosmos.dbt.project import copy_msgpack_for_partial_parse, create_symlinks, environ +from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -98,7 +99,7 @@ def is_freshness_effective(freshness: dict[str, Any]) -> bool: def run_command(command: list[str], tmp_dir: Path, env_vars: dict[str, str]) -> str: """Run a command in a subprocess, returning the stdout.""" logger.info("Running command: `%s`", " ".join(command)) - logger.info("Environment variable keys: %s", env_vars.keys()) + logger.debug("Environment variable keys: %s", env_vars.keys()) process = Popen( command, stdout=PIPE, @@ -164,6 +165,7 @@ def __init__( render_config: RenderConfig = RenderConfig(), execution_config: ExecutionConfig = ExecutionConfig(), profile_config: ProfileConfig | None = None, + cache_dir: Path | None = None, # dbt_vars only supported for LegacyDbtProject dbt_vars: dict[str, str] | None = None, ): @@ -171,6 +173,7 @@ def __init__( self.render_config = render_config self.profile_config = profile_config self.execution_config = execution_config + self.cache_dir = cache_dir self.dbt_vars = dbt_vars or {} def load( @@ -285,14 +288,19 @@ def load_via_dbt_ls(self) -> None: f"Content of the dbt project dir {self.render_config.project_path}: `{os.listdir(self.render_config.project_path)}`" ) tmpdir_path = Path(tmpdir) - create_symlinks(self.render_config.project_path, tmpdir_path, self.render_config.dbt_deps) - if self.project.partial_parse: - copy_msgpack_for_partial_parse(self.render_config.project_path, tmpdir_path) + abs_project_path = self.render_config.project_path.absolute() + create_symlinks(abs_project_path, tmpdir_path, self.render_config.dbt_deps) - with self.profile_config.ensure_profile(use_mock_values=True) as profile_values, environ( - self.project.env_vars or self.render_config.env_vars or {} - ): + if self.project.partial_parse and self.cache_dir: + latest_partial_parse = cache._get_latest_partial_parse(abs_project_path, self.cache_dir) + logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse) + if latest_partial_parse is not None: + cache._copy_partial_parse_to_project(latest_partial_parse, tmpdir_path) + + with self.profile_config.ensure_profile( + use_mock_values=self.render_config.enable_mock_profile + ) as profile_values, environ(self.project.env_vars or self.render_config.env_vars or {}): (profile_path, env_vars) = profile_values env = os.environ.copy() env.update(env_vars) @@ -323,6 +331,11 @@ def load_via_dbt_ls(self) -> None: self.nodes = nodes self.filtered_nodes = nodes + if self.project.partial_parse and self.cache_dir: + partial_parse_file = get_partial_parse_path(tmpdir_path) + if partial_parse_file.exists(): + cache._update_partial_parse_cache(partial_parse_file, self.cache_dir) + def load_via_dbt_ls_file(self) -> None: """ This is between dbt ls and full manifest. It allows to use the output (needs to be json output) of the dbt ls as a diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index ad328a332..4a3b036b3 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -20,14 +20,11 @@ def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool os.symlink(project_path / child_name, tmp_dir / child_name) -def copy_msgpack_for_partial_parse(project_path: Path, tmp_dir: Path) -> None: - partial_parse_file = Path(project_path) / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME - - if partial_parse_file.exists(): - tmp_target_dir = tmp_dir / DBT_TARGET_DIR_NAME - tmp_target_dir.mkdir(exist_ok=True) - - shutil.copy(str(partial_parse_file), str(tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME)) +def get_partial_parse_path(project_dir_path: Path) -> Path: + """ + Return the partial parse (partial_parse.msgpack) path for a given dbt project directory. + """ + return project_dir_path / DBT_TARGET_DIR_NAME / DBT_PARTIAL_PARSE_FILE_NAME @contextmanager diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index b6e1797d8..f9f4645b6 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -2,6 +2,8 @@ import os from abc import ABCMeta, abstractmethod +from functools import cached_property +from pathlib import Path from typing import Any, Sequence, Tuple import yaml @@ -10,6 +12,7 @@ from airflow.utils.operator_helpers import context_to_airflow_vars from airflow.utils.strings import to_boolean +from cosmos import cache from cosmos.dbt.executable import get_system_dbt from cosmos.log import get_logger @@ -61,6 +64,7 @@ class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta): (i.e. /home/astro/.pyenv/versions/dbt_venv/bin/dbt) :param dbt_cmd_flags: List of flags to pass to dbt command :param dbt_cmd_global_flags: List of dbt global flags to be passed to the dbt command + :param cache_dir: Directory used to cache Cosmos/dbt artifacts in Airflow worker nodes """ template_fields: Sequence[str] = ("env", "select", "exclude", "selector", "vars", "models") @@ -108,6 +112,7 @@ def __init__( dbt_executable_path: str = get_system_dbt(), dbt_cmd_flags: list[str] | None = None, dbt_cmd_global_flags: list[str] | None = None, + cache_dir: Path | None = None, **kwargs: Any, ) -> None: self.project_dir = project_dir @@ -135,6 +140,7 @@ def __init__( self.dbt_executable_path = dbt_executable_path self.dbt_cmd_flags = dbt_cmd_flags self.dbt_cmd_global_flags = dbt_cmd_global_flags or [] + self.cache_dir = cache_dir super().__init__(**kwargs) def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index b39de10f8..4a34da13f 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -18,7 +18,9 @@ from airflow.utils.session import NEW_SESSION, create_session, provide_session from attr import define +from cosmos import cache from cosmos.constants import InvocationMode +from cosmos.dbt.project import get_partial_parse_path try: from airflow.datasets import Dataset @@ -49,7 +51,7 @@ parse_number_of_warnings_dbt_runner, parse_number_of_warnings_subprocess, ) -from cosmos.dbt.project import change_working_directory, copy_msgpack_for_partial_parse, create_symlinks, environ +from cosmos.dbt.project import change_working_directory, create_symlinks, environ from cosmos.hooks.subprocess import ( FullOutputSubprocessHook, FullOutputSubprocessResult, @@ -261,7 +263,6 @@ def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str) -> d # Exclude the dbt executable path from the command cli_args = command[1:] - logger.info("Trying to run dbtRunner with:\n %s\n in %s", cli_args, cwd) with change_working_directory(cwd), environ(env): @@ -282,16 +283,21 @@ def run_command( self._discover_invocation_mode() with tempfile.TemporaryDirectory() as tmp_project_dir: + logger.info( "Cloning project to writable temp directory %s from %s", tmp_project_dir, self.project_dir, ) + tmp_dir_path = Path(tmp_project_dir) env = {k: str(v) for k, v in env.items()} - create_symlinks(Path(self.project_dir), Path(tmp_project_dir), self.install_deps) + create_symlinks(Path(self.project_dir), tmp_dir_path, self.install_deps) - if self.partial_parse: - copy_msgpack_for_partial_parse(Path(self.project_dir), Path(tmp_project_dir)) + if self.partial_parse and self.cache_dir is not None: + latest_partial_parse = cache._get_latest_partial_parse(Path(self.project_dir), self.cache_dir) + logger.info("Partial parse is enabled and the latest partial parse file is %s", latest_partial_parse) + if latest_partial_parse is not None: + cache._copy_partial_parse_to_project(latest_partial_parse, tmp_dir_path) with self.profile_config.ensure_profile() as profile_values: (profile_path, env_vars) = profile_values @@ -319,14 +325,15 @@ def run_command( full_cmd = cmd + flags - logger.info("Using environment variables keys: %s", env.keys()) + logger.debug("Using environment variables keys: %s", env.keys()) + result = self.invoke_dbt( command=full_cmd, env=env, cwd=tmp_project_dir, ) if is_openlineage_available: - self.calculate_openlineage_events_completes(env, Path(tmp_project_dir)) + self.calculate_openlineage_events_completes(env, tmp_dir_path) context[ "task_instance" ].openlineage_events_completes = self.openlineage_events_completes # type: ignore @@ -338,6 +345,11 @@ def run_command( logger.info("Outlets: %s", outlets) self.register_dataset(inlets, outlets) + if self.partial_parse and self.cache_dir: + partial_parse_file = get_partial_parse_path(tmp_dir_path) + if partial_parse_file.exists(): + cache._update_partial_parse_cache(partial_parse_file, self.cache_dir) + self.store_compiled_sql(tmp_project_dir, context) self.handle_exception(result) if self.callback: diff --git a/cosmos/settings.py b/cosmos/settings.py new file mode 100644 index 000000000..35e235edc --- /dev/null +++ b/cosmos/settings.py @@ -0,0 +1,11 @@ +import tempfile +from pathlib import Path + +from airflow.configuration import conf + +from cosmos.constants import DEFAULT_COSMOS_CACHE_DIR_NAME + +# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change +DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME) +cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR) +enable_cache = conf.get("cosmos", "enable_cache", fallback=True) diff --git a/dev/dags/basic_cosmos_task_group.py b/dev/dags/basic_cosmos_task_group.py index 55842ee10..f230e87d4 100644 --- a/dev/dags/basic_cosmos_task_group.py +++ b/dev/dags/basic_cosmos_task_group.py @@ -25,7 +25,9 @@ ), ) -shared_execution_config = ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER) +shared_execution_config = ExecutionConfig( + invocation_mode=InvocationMode.SUBPROCESS, +) @dag( @@ -44,7 +46,7 @@ def basic_cosmos_task_group() -> None: project_config=ProjectConfig( (DBT_ROOT_PATH / "jaffle_shop").as_posix(), ), - render_config=RenderConfig(select=["path:seeds/raw_customers.csv"]), + render_config=RenderConfig(select=["path:seeds/raw_customers.csv"], enable_mock_profile=False), execution_config=shared_execution_config, operator_args={"install_deps": True}, profile_config=profile_config, @@ -56,7 +58,10 @@ def basic_cosmos_task_group() -> None: project_config=ProjectConfig( (DBT_ROOT_PATH / "jaffle_shop").as_posix(), ), - render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]), + render_config=RenderConfig( + select=["path:seeds/raw_orders.csv"], + enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping + ), execution_config=shared_execution_config, operator_args={"install_deps": True}, profile_config=profile_config, diff --git a/dev/dags/cosmos_profile_mapping.py b/dev/dags/cosmos_profile_mapping.py index 6570467e4..3c9a503ba 100644 --- a/dev/dags/cosmos_profile_mapping.py +++ b/dev/dags/cosmos_profile_mapping.py @@ -11,12 +11,15 @@ from airflow.decorators import dag from airflow.operators.empty import EmptyOperator -from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig +from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig +from cosmos.constants import InvocationMode from cosmos.profiles import get_automatic_profile_mapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +execution_config = ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER) + @dag( schedule_interval="@daily", @@ -30,6 +33,7 @@ def cosmos_profile_mapping() -> None: pre_dbt = EmptyOperator(task_id="pre_dbt") jaffle_shop = DbtTaskGroup( + execution_config=execution_config, project_config=ProjectConfig( DBT_ROOT_PATH / "jaffle_shop", ), diff --git a/pyproject.toml b/pyproject.toml index 0c26fa799..efad8e043 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -199,7 +199,7 @@ line-length = 120 [tool.ruff.lint] select = ["C901", "I"] [tool.ruff.lint.mccabe] -max-complexity = 8 +max-complexity = 10 [tool.distutils.bdist_wheel] universal = true diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index f72bbb146..22cd5c617 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -8,7 +8,7 @@ import yaml from cosmos.config import CosmosConfigException, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig -from cosmos.constants import DbtResourceType, ExecutionMode +from cosmos.constants import DBT_TARGET_DIR_NAME, DbtResourceType, ExecutionMode from cosmos.dbt.graph import ( CosmosLoadDbtException, DbtGraph, @@ -482,7 +482,9 @@ def test_load_via_dbt_ls_without_dbt_deps(postgres_profile_config): @pytest.mark.integration -def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages(tmp_dbt_project_dir, postgres_profile_config): +def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages( + tmp_dbt_project_dir, postgres_profile_config, caplog +): local_flags = [ "--project-dir", tmp_dbt_project_dir / DBT_PROJECT_NAME, @@ -515,7 +517,42 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages(tmp_dbt_ profile_config=postgres_profile_config, ) - dbt_graph.load_via_dbt_ls() # does not raise exception + assert dbt_graph.load_via_dbt_ls() is None # Doesn't raise any exceptions + + +@pytest.mark.integration +def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path): + """ + When using RenderConfig.enable_mock_profile=False and defining DbtGraph.cache_dir, + Cosmos should leverage dbt partial parsing. + """ + import logging + + caplog.set_level(logging.DEBUG) + + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + render_config = RenderConfig( + dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False + ) + execution_config = ExecutionConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) + dbt_graph = DbtGraph( + project=project_config, + render_config=render_config, + execution_config=execution_config, + profile_config=postgres_profile_config, + cache_dir=tmp_path, + ) + + (tmp_path / DBT_TARGET_DIR_NAME).mkdir(parents=True, exist_ok=True) + + # First time dbt ls is run, partial parsing was not cached, so we don't benefit from this + dbt_graph.load_via_dbt_ls() + assert "Unable to do partial parsing" in caplog.text + + # From the second time we run dbt ls onwards, we benefit from partial parsing + caplog.clear() + dbt_graph.load_via_dbt_ls() # should not not raise exception + assert not "Unable to do partial parsing" in caplog.text @pytest.mark.integration diff --git a/tests/dbt/test_project.py b/tests/dbt/test_project.py index 6f9e2cb84..09ab1a735 100644 --- a/tests/dbt/test_project.py +++ b/tests/dbt/test_project.py @@ -4,7 +4,7 @@ import pytest -from cosmos.dbt.project import change_working_directory, copy_msgpack_for_partial_parse, create_symlinks, environ +from cosmos.dbt.project import change_working_directory, create_symlinks, environ DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -20,30 +20,6 @@ def test_create_symlinks(tmp_path): assert child.name not in ("logs", "target", "profiles.yml", "dbt_packages") -@pytest.mark.parametrize("exists", [True, False]) -def test_copy_manifest_for_partial_parse(tmp_path, exists): - project_path = tmp_path / "project" - target_path = project_path / "target" - partial_parse_file = target_path / "partial_parse.msgpack" - - target_path.mkdir(parents=True) - - if exists: - partial_parse_file.write_bytes(b"") - - tmp_dir = tmp_path / "tmp_dir" - tmp_dir.mkdir() - - copy_msgpack_for_partial_parse(project_path, tmp_dir) - - tmp_partial_parse_file = tmp_dir / "target" / "partial_parse.msgpack" - - if exists: - assert tmp_partial_parse_file.exists() - else: - assert not tmp_partial_parse_file.exists() - - @patch.dict(os.environ, {"VAR1": "value1", "VAR2": "value2"}) def test_environ_context_manager(): # Define the expected environment variables diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 250b044fa..80c6c58a4 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -3,6 +3,7 @@ import shutil import sys import tempfile +from datetime import datetime from pathlib import Path from unittest.mock import MagicMock, call, patch @@ -15,6 +16,7 @@ from packaging import version from pendulum import datetime +from cosmos import cache from cosmos.config import ProfileConfig from cosmos.constants import InvocationMode from cosmos.dbt.parser.output import ( @@ -422,6 +424,33 @@ def test_run_operator_dataset_inlets_and_outlets(): assert test_operator.outlets == [] +@pytest.mark.integration +def test_run_operator_caches_partial_parsing(caplog, tmp_path): + caplog.set_level(logging.DEBUG) + with DAG("test-partial-parsing", start_date=datetime(2022, 1, 1)) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=real_profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + cache_dir=cache._obtain_cache_dir_path("test-partial-parsing", tmp_path), + invocation_mode=InvocationMode.SUBPROCESS, + ) + seed_operator + + run_test_dag(dag) + + # Unable to do partial parsing because saved manifest not found. Starting full parse. + assert "Unable to do partial parsing" in caplog.text + + caplog.clear() + run_test_dag(dag) + + assert not "Unable to do partial parsing" in caplog.text + + def test_dbt_base_operator_no_partial_parse() -> None: dbt_base_operator = ConcreteDbtLocalBaseOperator( @@ -757,6 +786,7 @@ def test_operator_execute_deps_parameters( "dev", ] task = DbtRunLocalOperator( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), profile_config=real_profile_config, task_id="my-task", project_dir=DBT_PROJ_DIR, diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 5866347ab..acf3c72af 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -1,5 +1,7 @@ +from datetime import datetime from unittest.mock import MagicMock, patch +from airflow.models import DAG from airflow.models.connection import Connection from cosmos.config import ProfileConfig @@ -45,6 +47,7 @@ def test_run_command( schema="fake_schema", ) venv_operator = ConcreteDbtVirtualenvBaseOperator( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), profile_config=profile_config, task_id="fake_task", install_deps=True, diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 000000000..2898475d0 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,66 @@ +import time +from datetime import datetime + +import pytest +from airflow import DAG +from airflow.utils.task_group import TaskGroup + +from cosmos.cache import _create_cache_identifier, _get_latest_partial_parse +from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME + +START_DATE = datetime(2024, 4, 16) +example_dag = DAG("dag", start_date=START_DATE) + + +@pytest.mark.parametrize( + "dag, task_group, result_identifier", + [ + (example_dag, None, "dag"), + (None, TaskGroup(dag=example_dag, group_id="inner_tg"), "dag__inner_tg"), + ( + None, + TaskGroup( + dag=example_dag, group_id="child_tg", parent_group=TaskGroup(dag=example_dag, group_id="parent_tg") + ), + "dag__parent_tg__child_tg", + ), + ( + None, + TaskGroup( + dag=example_dag, + group_id="child_tg", + parent_group=TaskGroup( + dag=example_dag, group_id="mum_tg", parent_group=TaskGroup(dag=example_dag, group_id="nana_tg") + ), + ), + "dag__nana_tg__mum_tg__child_tg", + ), + ], +) +def test_create_cache_identifier(dag, task_group, result_identifier): + assert _create_cache_identifier(dag, task_group) == result_identifier + + +def test_get_latest_partial_parse(tmp_path): + old_tmp_dir = tmp_path / "old" + old_tmp_target_dir = old_tmp_dir / DBT_TARGET_DIR_NAME + old_tmp_target_dir.mkdir(parents=True, exist_ok=True) + old_partial_parse_filepath = old_tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME + old_partial_parse_filepath.touch() + + # This is necessary in the CI, but not on local MacOS dev env, since the files + # were being created too quickly and sometimes had the same st_mtime + time.sleep(1) + + new_tmp_dir = tmp_path / "new" + new_tmp_target_dir = new_tmp_dir / DBT_TARGET_DIR_NAME + new_tmp_target_dir.mkdir(parents=True, exist_ok=True) + new_partial_parse_filepath = new_tmp_target_dir / DBT_PARTIAL_PARSE_FILE_NAME + new_partial_parse_filepath.touch() + + assert _get_latest_partial_parse(old_tmp_dir, new_tmp_dir) == new_partial_parse_filepath + assert _get_latest_partial_parse(new_tmp_dir, old_tmp_dir) == new_partial_parse_filepath + assert _get_latest_partial_parse(old_tmp_dir, old_tmp_dir) == old_partial_parse_filepath + assert _get_latest_partial_parse(old_tmp_dir, tmp_path) == old_partial_parse_filepath + assert _get_latest_partial_parse(tmp_path, old_tmp_dir) == old_partial_parse_filepath + assert _get_latest_partial_parse(tmp_path, tmp_path) is None diff --git a/tests/test_converter.py b/tests/test_converter.py index 10dc37f13..bef2dc06d 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -1,3 +1,4 @@ +import tempfile from datetime import datetime from pathlib import Path from unittest.mock import MagicMock, patch @@ -152,6 +153,7 @@ def test_converter_creates_dag_with_seed(mock_load_dbt_graph, execution_mode, op profiles_yml_filepath=SAMPLE_PROFILE_YML, ) converter = DbtToAirflowConverter( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), nodes=nodes, project_config=project_config, profile_config=profile_config, @@ -185,6 +187,7 @@ def test_converter_creates_dag_with_project_path_str(mock_load_dbt_graph, execut profiles_yml_filepath=SAMPLE_PROFILE_YML, ) converter = DbtToAirflowConverter( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), nodes=nodes, project_config=project_config, profile_config=profile_config, @@ -470,6 +473,75 @@ def test_converter_invocation_mode_added_to_task_args( assert "invocation_mode" not in kwargs["task_args"] +@patch("cosmos.config.ProjectConfig.validate_project") +@patch("cosmos.converter.validate_initial_user_config") +@patch("cosmos.converter.DbtGraph") +@patch("cosmos.converter.build_airflow_graph") +def test_converter_uses_cache_dir( + mock_build_airflow_graph, + mock_dbt_graph, + mock_user_config, + mock_validate_project, +): + """Tests that DbtGraph and operator and Airflow task args contain expected cache dir .""" + project_config = ProjectConfig(project_name="fake-project", dbt_project_path="/some/project/path") + execution_config = ExecutionConfig() + render_config = RenderConfig(enable_mock_profile=False) + profile_config = MagicMock() + + with DAG("test-id", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args={}, + ) + task_args_cache_dir = mock_build_airflow_graph.call_args[1]["task_args"]["cache_dir"] + dbt_graph_cache_dir = mock_dbt_graph.call_args[1]["cache_dir"] + + assert Path(tempfile.gettempdir()) in task_args_cache_dir.parents + assert task_args_cache_dir.parent.stem == "cosmos" + assert task_args_cache_dir.stem == "test-id" + assert task_args_cache_dir == dbt_graph_cache_dir + + +@patch("cosmos.settings.enable_cache", False) +@patch("cosmos.config.ProjectConfig.validate_project") +@patch("cosmos.converter.validate_initial_user_config") +@patch("cosmos.converter.DbtGraph") +@patch("cosmos.converter.build_airflow_graph") +def test_converter_disable_cache_sets_cache_dir_to_none( + mock_build_airflow_graph, + mock_dbt_graph, + mock_user_config, + mock_validate_project, +): + """Tests that DbtGraph and operator and Airflow task args contain expected cache dir.""" + project_config = ProjectConfig(project_name="fake-project", dbt_project_path="/some/project/path") + execution_config = ExecutionConfig() + render_config = RenderConfig(enable_mock_profile=False) + profile_config = MagicMock() + + with DAG("test-id", start_date=datetime(2024, 1, 1)) as dag: + DbtToAirflowConverter( + dag=dag, + nodes=nodes, + project_config=project_config, + profile_config=profile_config, + execution_config=execution_config, + render_config=render_config, + operator_args={}, + ) + task_args_cache_dir = mock_build_airflow_graph.call_args[1]["task_args"]["cache_dir"] + dbt_graph_cache_dir = mock_dbt_graph.call_args[1]["cache_dir"] + + assert dbt_graph_cache_dir is None + assert task_args_cache_dir == dbt_graph_cache_dir + + @pytest.mark.parametrize( "execution_mode,operator_args", [ @@ -491,6 +563,7 @@ def test_converter_contains_dbt_graph(mock_load_dbt_graph, execution_mode, opera profiles_yml_filepath=SAMPLE_PROFILE_YML, ) converter = DbtToAirflowConverter( + dag=DAG("sample_dag", start_date=datetime(2024, 4, 16)), nodes=nodes, project_config=project_config, profile_config=profile_config,