From 7b9e1e6ee8b66ac76225e23f7704ee039dc1c14a Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 7 Jun 2024 11:12:48 +0100 Subject: [PATCH 1/4] Only run `dbt deps` when there are dependencies (#1030) As of Cosmos 1.4, Cosmos will attempt to run dbt deps even if there are no `dependencies.yml` or `packages.yml` in the dbt project directory. This causes an unnecessary overhead in creating subprocesses without any benefit. This problem was initially spotted by @AlgirdasDubickas, who created a pull request proposing a solution to the problem: https://github.com/astronomer/astronomer-cosmos/pull/893/ Despite the original PR becoming stale, the problem it addresses remains relevant. This PR proposes a different implementation to solve the same problem. It addresses the issue from a rendering perspective (converting a dbt project into an Airflow DAG using `LoadMode.DBT_LS`) and an execution perspective (when Airflow worker nodes run/trigger dbt commands to be run when using `ExecutionMode.LOCAL` or `ExecutionMode.VIRTUALENV`). Co-authored-by: AlgirdasDubickas <123624084+AlgirdasDubickas@users.noreply.github.com> (cherry picked from commit 3a63bee4a6569bb80f925f70125710bd0f4da9eb) --- cosmos/constants.py | 1 + cosmos/dbt/graph.py | 6 ++++-- cosmos/dbt/project.py | 30 +++++++++++++++++++++++++++++- cosmos/operators/local.py | 6 ++++-- tests/dbt/test_project.py | 22 +++++++++++++++++++++- tests/operators/test_local.py | 7 +++++++ 6 files changed, 66 insertions(+), 6 deletions(-) diff --git a/cosmos/constants.py b/cosmos/constants.py index b356d5542..92bf883b2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -15,6 +15,7 @@ DBT_TARGET_DIR_NAME = "target" DBT_PARTIAL_PARSE_FILE_NAME = "partial_parse.msgpack" DBT_MANIFEST_FILE_NAME = "manifest.json" +DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"} DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index c37932caf..b38469b08 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -24,7 +24,7 @@ LoadMode, ) from cosmos.dbt.parser.project import LegacyDbtProject -from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path +from cosmos.dbt.project import create_symlinks, environ, get_partial_parse_path, has_non_empty_dependencies_file from cosmos.dbt.selector import select_nodes from cosmos.log import get_logger @@ -285,7 +285,9 @@ def load_via_dbt_ls(self) -> None: env[DBT_LOG_PATH_ENVVAR] = str(self.log_dir) env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) - if self.render_config.dbt_deps: + if self.render_config.dbt_deps and has_non_empty_dependencies_file( + Path(self.render_config.project_path) + ): deps_command = [dbt_cmd, "deps"] deps_command.extend(self.local_flags) stdout = run_command(deps_command, tmpdir_path, env) diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index c1c7aa080..d8750cd44 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -5,7 +5,35 @@ from pathlib import Path from typing import Generator -from cosmos.constants import DBT_LOG_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME +from cosmos.constants import ( + DBT_DEPENDENCIES_FILE_NAMES, + DBT_LOG_DIR_NAME, + DBT_PARTIAL_PARSE_FILE_NAME, + DBT_TARGET_DIR_NAME, +) +from cosmos.log import get_logger + +logger = get_logger() + + +def has_non_empty_dependencies_file(project_path: Path) -> bool: + """ + Check if the dbt project has dependencies.yml or packages.yml. + + :param project_path: Path to the project + :returns: True or False + """ + project_dir = Path(project_path) + has_deps = False + for filename in DBT_DEPENDENCIES_FILE_NAMES: + filepath = project_dir / filename + if filepath.exists() and filepath.stat().st_size > 0: + has_deps = True + break + + if not has_deps: + logger.info(f"Project {project_path} does not have {DBT_DEPENDENCIES_FILE_NAMES}") + return has_deps def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None: diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index c62f708e8..27d377916 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -18,7 +18,7 @@ from cosmos import cache from cosmos.constants import InvocationMode -from cosmos.dbt.project import get_partial_parse_path +from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError from cosmos.settings import LINEAGE_NAMESPACE @@ -126,7 +126,6 @@ def __init__( **kwargs: Any, ) -> None: self.profile_config = profile_config - self.install_deps = install_deps self.callback = callback self.compiled_sql = "" self.should_store_compiled_sql = should_store_compiled_sql @@ -146,6 +145,9 @@ def __init__( # as it can break existing DAGs. self.append_env = append_env + # We should not spend time trying to install deps if the project doesn't have any dependencies + self.install_deps = install_deps and has_non_empty_dependencies_file(Path(self.project_dir)) + @cached_property def subprocess_hook(self) -> FullOutputSubprocessHook: """Returns hook for running the bash command.""" diff --git a/tests/dbt/test_project.py b/tests/dbt/test_project.py index f55525a43..df625182f 100644 --- a/tests/dbt/test_project.py +++ b/tests/dbt/test_project.py @@ -2,7 +2,9 @@ from pathlib import Path from unittest.mock import patch -from cosmos.dbt.project import change_working_directory, create_symlinks, environ +import pytest + +from cosmos.dbt.project import change_working_directory, create_symlinks, environ, has_non_empty_dependencies_file DBT_PROJECTS_ROOT_DIR = Path(__file__).parent.parent.parent / "dev/dags/dbt" @@ -49,3 +51,21 @@ def test_change_working_directory(mock_chdir): # Check if os.chdir is called with the previous working directory mock_chdir.assert_called_with(os.getcwd()) + + +@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"]) +def test_has_non_empty_dependencies_file_is_true(tmpdir, filename): + filepath = Path(tmpdir) / filename + filepath.write_text("content") + assert has_non_empty_dependencies_file(tmpdir) + + +@pytest.mark.parametrize("filename", ["packages.yml", "dependencies.yml"]) +def test_has_non_empty_dependencies_file_is_false(tmpdir, filename): + filepath = Path(tmpdir) / filename + filepath.touch() + assert not has_non_empty_dependencies_file(tmpdir) + + +def test_has_non_empty_dependencies_file_is_false_in_empty_dir(tmpdir): + assert not has_non_empty_dependencies_file(tmpdir) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 5513b1c4b..f90237082 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -80,6 +80,13 @@ class ConcreteDbtLocalBaseOperator(DbtLocalBaseOperator): base_cmd = ["cmd"] +def test_install_deps_in_empty_dir_becomes_false(tmpdir): + dbt_base_operator = ConcreteDbtLocalBaseOperator( + profile_config=profile_config, task_id="my-task", project_dir=tmpdir, install_deps=True + ) + assert not dbt_base_operator.install_deps + + def test_dbt_base_operator_add_global_flags() -> None: dbt_base_operator = ConcreteDbtLocalBaseOperator( profile_config=profile_config, From 67071a1a314db4a98225f3500c2f9630ddf251da Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 7 Jun 2024 18:10:36 +0530 Subject: [PATCH 2/4] Bring back `dataset` as a required field for BigQuery profile (#1033) In PR #1017, we attempted to remove `dataset` from the required fields list for the BigQuery profile. However, we realised that this is failing BiqQuery dbt operations as it indeed is a required field. Hence, bring back the same as a required field. This is also necessary for building the mock profile where we construct the profile by taking in consideration only the required fields. Closes: #1031 (cherry picked from commit 803776abbeaf1f1f8ea739f7e73c324e7e8edebc) --- cosmos/profiles/bigquery/service_account_keyfile_dict.py | 3 +++ .../profiles/bigquery/test_bq_service_account_keyfile_dict.py | 1 + 2 files changed, 4 insertions(+) diff --git a/cosmos/profiles/bigquery/service_account_keyfile_dict.py b/cosmos/profiles/bigquery/service_account_keyfile_dict.py index 17858d7bb..480db669b 100644 --- a/cosmos/profiles/bigquery/service_account_keyfile_dict.py +++ b/cosmos/profiles/bigquery/service_account_keyfile_dict.py @@ -20,8 +20,11 @@ class GoogleCloudServiceAccountDictProfileMapping(BaseProfileMapping): dbt_profile_type: str = "bigquery" dbt_profile_method: str = "service-account-json" + # Do not remove dataset as a required field form the below list. Although it's observed that it's not a required + # field for some databases like Postgres, it's required for BigQuery. required_fields = [ "project", + "dataset", "keyfile_json", ] diff --git a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py index 6f0d60b8d..d30c90021 100755 --- a/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py +++ b/tests/profiles/bigquery/test_bq_service_account_keyfile_dict.py @@ -96,6 +96,7 @@ def test_mock_profile(mock_bigquery_conn_with_dict: Connection): "type": "bigquery", "method": "service-account-json", "project": "mock_value", + "dataset": "mock_value", "threads": 1, "keyfile_json": None, } From 96003e5884f8b61aff76e9a29f18c97eab19db0c Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 7 Jun 2024 14:25:00 +0100 Subject: [PATCH 3/4] Fix docs so it does not reference non-existing `get_dbt_dataset` (#1034) [The documentation](https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html) was outdated. The method `get_dbt_dataset` no longer exists. It used to exist in older versions of Cosmos (before 1.1) when the URIs respected the format: `Dataset(f"DBT://{connection_id.upper()}/{project_name.upper()}/{model_name.upper()}")` More information on why we changed this: https://github.com/astronomer/astronomer-cosmos/issues/305 Closes: #1032 (cherry picked from commit c47e1049cf4aa6457591a610e12c5dca97e1d169) --- .github/ISSUE_TEMPLATE/01-bug.yml | 2 +- .github/ISSUE_TEMPLATE/02-feature.yml | 3 ++- docs/configuration/scheduling.rst | 19 ++++++++++++------- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/01-bug.yml b/.github/ISSUE_TEMPLATE/01-bug.yml index 658d0b9cb..4a5517338 100644 --- a/.github/ISSUE_TEMPLATE/01-bug.yml +++ b/.github/ISSUE_TEMPLATE/01-bug.yml @@ -1,7 +1,7 @@ --- name: Bug Report description: File a bug report. -title: "[Bug]: " +title: "[Bug] " labels: ["bug", "triage-needed"] body: - type: markdown diff --git a/.github/ISSUE_TEMPLATE/02-feature.yml b/.github/ISSUE_TEMPLATE/02-feature.yml index e179d357d..f8cd9e24d 100644 --- a/.github/ISSUE_TEMPLATE/02-feature.yml +++ b/.github/ISSUE_TEMPLATE/02-feature.yml @@ -1,7 +1,8 @@ --- name: Feature request description: Suggest an idea for this project -labels: ["enhancement", "needs-triage"] +title: "[Feature] " +labels: ["enhancement", "triage-needed"] body: - type: markdown attributes: diff --git a/docs/configuration/scheduling.rst b/docs/configuration/scheduling.rst index a1275ee19..d96930395 100644 --- a/docs/configuration/scheduling.rst +++ b/docs/configuration/scheduling.rst @@ -24,11 +24,17 @@ To schedule a dbt project on a time-based schedule, you can use Airflow's schedu Data-Aware Scheduling --------------------- -By default, Cosmos emits `Airflow Datasets `_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets in the following format: +Apache Airflow 2.4 introduced the concept of `scheduling based on Datasets `_. + +By default, if Airflow 2.4 or higher is used, Cosmos emits `Airflow Datasets `_ when running dbt projects. This allows you to use Airflow's data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the `OpenLineage Naming Convention `_. + +Cosmos calculates these URIs during the task execution, by using the library `OpenLineage Integration Common `_. + +This block illustrates a Cosmos-generated dataset for Postgres: .. code-block:: python - Dataset("DBT://{connection_id}/{project_name}/{model_name}") + Dataset("postgres://host:5432/database.schema.table") For example, let's say you have: @@ -36,11 +42,13 @@ For example, let's say you have: - A dbt project (``project_one``) with a model called ``my_model`` that runs daily - A second dbt project (``project_two``) with a model called ``my_other_model`` that you want to run immediately after ``my_model`` +We are assuming that the Database used is Postgres, the host is ``host``, the database is ``database`` and the schema is ``schema``. + Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_other_model`` to run after ``my_model``. For example, you can use the following DAGs: .. code-block:: python - from cosmos import DbtDag, get_dbt_dataset + from cosmos import DbtDag project_one = DbtDag( # ... @@ -49,10 +57,7 @@ Then, you can use Airflow's data-aware scheduling capabilities to schedule ``my_ ) project_two = DbtDag( - # for airflow <=2.3 - # schedule=[get_dbt_dataset("my_conn", "project_one", "my_model")], - # for airflow > 2.3 - schedule=[get_dbt_dataset("my_conn", "project_one", "my_model")], + schedule=[Dataset("postgres://host:5432/database.schema.my_model")], dbt_project_name="project_two", ) From a9c2dbac1445bb75c0b3852aa3ad49df32bcc543 Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Fri, 7 Jun 2024 19:08:02 +0530 Subject: [PATCH 4/4] Release 1.4.3 --- CHANGELOG.rst | 16 ++++++++++++++++ cosmos/__init__.py | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 2e6331eda..7757d5fb5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,22 @@ Changelog ========= +1.4.3 (2024-06-07) +----------------- + +Bug fixes + +* Bring back ``dataset`` as a required field for BigQuery profile by @pankajkoti in #1033 + +Enhancements + +* Only run ``dbt deps`` when there are dependencies by @tatiana in #1030 + +Docs + +* Fix docs so it does not reference non-existing ``get_dbt_dataset`` by @tatiana in #1034 + + 1.4.2 (2024-06-06) ------------------ diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 7a73e722e..100649bfb 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,7 @@ Contains dags, task groups, and operators. """ -__version__ = "1.4.2" +__version__ = "1.4.3" from cosmos.airflow.dag import DbtDag