From 48055d8d5bdc16352b038ff37c9fddad483c5ea6 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 20 Nov 2024 14:50:11 +0530 Subject: [PATCH] Add dbt clone operator (#1326) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR introduces the DbtCloneOperator. For more details, refer to the dbt documentation: https://docs.getdbt.com/reference/commands/clone. ## Testing **Airflow DAG** ```python from datetime import datetime from airflow import DAG from cosmos import DbtSeedLocalOperator, DbtRunLocalOperator, DbtCloneLocalOperator, ProfileConfig DBT_PROJ_DIR="/usr/local/airflow/dbt/jaffle_shop" profile_config1=ProfileConfig( profile_name="bigquery_dev", target_name="dev", profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml" ) profile_config2=ProfileConfig( profile_name="bigquery_clone", target_name="dev", profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml" ) with DAG("test-id-1", start_date=datetime(2024, 1, 1), catchup=False) as dag: seed_operator = DbtSeedLocalOperator( profile_config=profile_config1, project_dir=DBT_PROJ_DIR, task_id="seed", dbt_cmd_flags=["--select", "raw_customers"], install_deps=True, append_env=True, ) run_operator = DbtRunLocalOperator( profile_config=profile_config1, project_dir=DBT_PROJ_DIR, task_id="run", dbt_cmd_flags=["--models", "stg_customers"], install_deps=True, append_env=True, ) clone_operator = DbtCloneLocalOperator( profile_config=profile_config2, project_dir=DBT_PROJ_DIR, task_id="clone", dbt_cmd_flags=["--models", "stg_customers", "--state", "/usr/local/airflow/dbt/jaffle_shop/target"], install_deps=True, append_env=True, ) seed_operator >> run_operator >> clone_operator ``` **DBT Profile** ``` bigquery_dev: target: dev outputs: dev: type: bigquery method: service-account project: astronomer-dag-authoring dataset: bq_dev threads: 4 # Must be a value of 1 or greater keyfile: /usr/local/airflow/include/key.json location: US bigquery_clone: target: dev outputs: dev: type: bigquery method: service-account project: astronomer-dag-authoring dataset: bq_clone threads: 4 # Must be a value of 1 or greater keyfile: /usr/local/airflow/include/key.json location: US ``` **Airflow DAG Run** Screenshot 2024-11-15 at 6 06 50 PM **BQ data WH** Screenshot 2024-11-15 at 6 04 29 PM ## Related Issue(s) closes: https://github.com/astronomer/astronomer-cosmos/issues/1268 closes: https://github.com/astronomer/astronomer-cosmos/issues/878 ## Breaking Change? No ## Limitation - The `dbt clone` command was introduced in dbt-core 1.6.0, so this feature is only available to users with dbt-core version 1.6 or higher https://github.com/dbt-labs/dbt-core/blob/1.6.latest/CHANGELOG.md - Users should ensure their database is supported for cloning operations. --- cosmos/__init__.py | 12 +++++ cosmos/operators/airflow_async.py | 5 ++ cosmos/operators/aws_eks.py | 10 ++++ cosmos/operators/azure_container_instance.py | 10 ++++ cosmos/operators/base.py | 25 ++++++++++ cosmos/operators/docker.py | 10 ++++ cosmos/operators/gcp_cloud_run_job.py | 10 ++++ cosmos/operators/kubernetes.py | 8 +++ cosmos/operators/local.py | 10 ++++ cosmos/operators/virtualenv.py | 10 ++++ dev/dags/example_operators.py | 50 +++++++++++++++++++ docs/getting_started/index.rst | 1 + docs/getting_started/operators.rst | 24 +++++++++ tests/operators/test_aws_eks.py | 2 + .../test_azure_container_instance.py | 2 + tests/operators/test_docker.py | 2 + tests/operators/test_gcp_cloud_run_job.py | 2 + tests/operators/test_kubernetes.py | 2 + tests/operators/test_local.py | 19 +++++++ tests/operators/test_virtualenv.py | 15 +++++- tests/test_example_dags.py | 2 + tests/test_example_dags_no_connections.py | 1 + 22 files changed, 231 insertions(+), 1 deletion(-) create mode 100644 dev/dags/example_operators.py create mode 100644 docs/getting_started/operators.rst diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 546ffefde..884a90659 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -30,6 +30,7 @@ from cosmos.operators.lazy_load import MissingPackage from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtDepsLocalOperator, DbtLSLocalOperator, DbtRunLocalOperator, @@ -44,6 +45,7 @@ try: from cosmos.operators.docker import ( DbtBuildDockerOperator, + DbtCloneDockerOperator, DbtLSDockerOperator, DbtRunDockerOperator, DbtRunOperationDockerOperator, @@ -65,6 +67,7 @@ try: from cosmos.operators.kubernetes import ( DbtBuildKubernetesOperator, + DbtCloneKubernetesOperator, DbtLSKubernetesOperator, DbtRunKubernetesOperator, DbtRunOperationKubernetesOperator, @@ -106,6 +109,7 @@ try: from cosmos.operators.azure_container_instance import ( DbtBuildAzureContainerInstanceOperator, + DbtCloneAzureContainerInstanceOperator, DbtLSAzureContainerInstanceOperator, DbtRunAzureContainerInstanceOperator, DbtRunOperationAzureContainerInstanceOperator, @@ -142,6 +146,7 @@ try: from cosmos.operators.aws_eks import ( DbtBuildAwsEksOperator, + DbtCloneAwsEksOperator, DbtLSAwsEksOperator, DbtRunAwsEksOperator, DbtRunOperationAwsEksOperator, @@ -170,6 +175,7 @@ try: from cosmos.operators.gcp_cloud_run_job import ( DbtBuildGcpCloudRunJobOperator, + DbtCloneGcpCloudRunJobOperator, DbtLSGcpCloudRunJobOperator, DbtRunGcpCloudRunJobOperator, DbtRunOperationGcpCloudRunJobOperator, @@ -217,6 +223,7 @@ "DbtResourceType", # Local Execution Mode "DbtBuildLocalOperator", + "DbtCloneLocalOperator", "DbtDepsLocalOperator", # deprecated, to be delete in Cosmos 2.x "DbtLSLocalOperator", "DbtRunLocalOperator", @@ -226,6 +233,7 @@ "DbtTestLocalOperator", # Docker Execution Mode "DbtBuildDockerOperator", + "DbtCloneDockerOperator", "DbtLSDockerOperator", "DbtRunDockerOperator", "DbtRunOperationDockerOperator", @@ -234,6 +242,7 @@ "DbtTestDockerOperator", # Kubernetes Execution Mode "DbtBuildKubernetesOperator", + "DbtCloneKubernetesOperator", "DbtLSKubernetesOperator", "DbtRunKubernetesOperator", "DbtRunOperationKubernetesOperator", @@ -242,6 +251,7 @@ "DbtTestKubernetesOperator", # Azure Container Instance Execution Mode "DbtBuildAzureContainerInstanceOperator", + "DbtCloneAzureContainerInstanceOperator", "DbtLSAzureContainerInstanceOperator", "DbtRunAzureContainerInstanceOperator", "DbtRunOperationAzureContainerInstanceOperator", @@ -250,6 +260,7 @@ "DbtTestAzureContainerInstanceOperator", # AWS EKS Execution Mode "DbtBuildAwsEksOperator", + "DbtCloneAwsEksOperator", "DbtLSAwsEksOperator", "DbtRunAwsEksOperator", "DbtRunOperationAwsEksOperator", @@ -258,6 +269,7 @@ "DbtTestAwsEksOperator", # GCP Cloud Run Job Execution Mode "DbtBuildGcpCloudRunJobOperator", + "DbtCloneGcpCloudRunJobOperator", "DbtLSGcpCloudRunJobOperator", "DbtRunGcpCloudRunJobOperator", "DbtRunOperationGcpCloudRunJobOperator", diff --git a/cosmos/operators/airflow_async.py b/cosmos/operators/airflow_async.py index a7f30a330..ac5b774c4 100644 --- a/cosmos/operators/airflow_async.py +++ b/cosmos/operators/airflow_async.py @@ -14,6 +14,7 @@ from cosmos.operators.base import AbstractDbtBaseOperator from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtCompileLocalOperator, DbtLocalBaseOperator, DbtLSLocalOperator, @@ -188,3 +189,7 @@ class DbtRunOperationAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtRunOpe class DbtCompileAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCompileLocalOperator): # type: ignore pass + + +class DbtCloneAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCloneLocalOperator): + pass diff --git a/cosmos/operators/aws_eks.py b/cosmos/operators/aws_eks.py index 1c194a3e4..7f20eda9a 100644 --- a/cosmos/operators/aws_eks.py +++ b/cosmos/operators/aws_eks.py @@ -8,6 +8,7 @@ from cosmos.operators.kubernetes import ( DbtBuildKubernetesOperator, + DbtCloneKubernetesOperator, DbtKubernetesBaseOperator, DbtLSKubernetesOperator, DbtRunKubernetesOperator, @@ -160,3 +161,12 @@ class DbtRunOperationAwsEksOperator(DbtAwsEksBaseOperator, DbtRunOperationKubern def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneAwsEksOperator(DbtAwsEksBaseOperator, DbtCloneKubernetesOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/azure_container_instance.py b/cosmos/operators/azure_container_instance.py index d3c8ebfc3..7f335bd99 100644 --- a/cosmos/operators/azure_container_instance.py +++ b/cosmos/operators/azure_container_instance.py @@ -8,6 +8,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -167,3 +168,12 @@ class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzu def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneAzureContainerInstanceOperator(DbtCloneMixin, DbtAzureContainerInstanceBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/base.py b/cosmos/operators/base.py index ed7969ebd..52fb98bac 100644 --- a/cosmos/operators/base.py +++ b/cosmos/operators/base.py @@ -438,3 +438,28 @@ class DbtCompileMixin: base_cmd = ["compile"] ui_color = "#877c7c" + + +class DbtCloneMixin: + """Mixin for dbt clone command.""" + + base_cmd = ["clone"] + ui_color = "#83a300" + + def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None: + self.full_refresh = full_refresh + super().__init__(**kwargs) + + def add_cmd_flags(self) -> list[str]: + flags = [] + + if isinstance(self.full_refresh, str): + # Handle template fields when render_template_as_native_obj=False + full_refresh = to_boolean(self.full_refresh) + else: + full_refresh = self.full_refresh + + if full_refresh is True: + flags.append("--full-refresh") + + return flags diff --git a/cosmos/operators/docker.py b/cosmos/operators/docker.py index 6f0956237..05671b4d0 100644 --- a/cosmos/operators/docker.py +++ b/cosmos/operators/docker.py @@ -7,6 +7,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -148,3 +149,12 @@ class DbtRunOperationDockerOperator(DbtRunOperationMixin, DbtDockerBaseOperator) def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneDockerOperator(DbtCloneMixin, DbtDockerBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py index 76570d56a..ef47db2cc 100644 --- a/cosmos/operators/gcp_cloud_run_job.py +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -10,6 +10,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -180,3 +181,12 @@ class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRun def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneGcpCloudRunJobOperator(DbtCloneMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/kubernetes.py b/cosmos/operators/kubernetes.py index 452932f07..f86925fde 100644 --- a/cosmos/operators/kubernetes.py +++ b/cosmos/operators/kubernetes.py @@ -11,6 +11,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtLSMixin, DbtRunMixin, DbtRunOperationMixin, @@ -260,3 +261,10 @@ class DbtRunOperationKubernetesOperator(DbtRunOperationMixin, DbtKubernetesBaseO def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) + + +class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator): + """Executes a dbt core clone command.""" + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 618d9e944..bf47ab4aa 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -70,6 +70,7 @@ from cosmos.operators.base import ( AbstractDbtBaseOperator, DbtBuildMixin, + DbtCloneMixin, DbtCompileMixin, DbtLSMixin, DbtRunMixin, @@ -1009,3 +1010,12 @@ class DbtCompileLocalOperator(DbtCompileMixin, DbtLocalBaseOperator): def __init__(self, *args: Any, **kwargs: Any) -> None: kwargs["should_upload_compiled_sql"] = True super().__init__(*args, **kwargs) + + +class DbtCloneLocalOperator(DbtCloneMixin, DbtLocalBaseOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index 0b06b2a81..3bd54da99 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -17,6 +17,7 @@ from cosmos.log import get_logger from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtDocsLocalOperator, DbtLocalBaseOperator, DbtLSLocalOperator, @@ -286,3 +287,12 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator) def __init__(self, *args: Any, **kwargs: Any): super().__init__(*args, **kwargs) + + +class DbtCloneVirtualenvOperator(DbtVirtualenvBaseOperator, DbtCloneLocalOperator): + """ + Executes a dbt core clone command. + """ + + def __init__(self, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) diff --git a/dev/dags/example_operators.py b/dev/dags/example_operators.py new file mode 100644 index 000000000..1c8624a34 --- /dev/null +++ b/dev/dags/example_operators.py @@ -0,0 +1,50 @@ +import os +from datetime import datetime +from pathlib import Path + +from airflow import DAG + +from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig + +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop" +DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml" +DBT_ARTIFACT = DBT_PROJ_DIR / "target" + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profiles_yml_filepath=DBT_PROFILE_PATH, +) + +with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag: + seed_operator = DbtSeedLocalOperator( + profile_config=profile_config, + project_dir=DBT_PROJ_DIR, + task_id="seed", + dbt_cmd_flags=["--select", "raw_customers"], + install_deps=True, + append_env=True, + ) + run_operator = DbtRunLocalOperator( + profile_config=profile_config, + project_dir=DBT_PROJ_DIR, + task_id="run", + dbt_cmd_flags=["--models", "stg_customers"], + install_deps=True, + append_env=True, + ) + + # [START clone_example] + clone_operator = DbtCloneLocalOperator( + profile_config=profile_config, + project_dir=DBT_PROJ_DIR, + task_id="clone", + dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT], + install_deps=True, + append_env=True, + ) + # [END clone_example] + + seed_operator >> run_operator >> clone_operator diff --git a/docs/getting_started/index.rst b/docs/getting_started/index.rst index ed1952793..958f115e1 100644 --- a/docs/getting_started/index.rst +++ b/docs/getting_started/index.rst @@ -14,6 +14,7 @@ Azure Container Instance Execution Mode GCP Cloud Run Job Execution Mode dbt and Airflow Similar Concepts + Operators Getting Started diff --git a/docs/getting_started/operators.rst b/docs/getting_started/operators.rst new file mode 100644 index 000000000..691a0eb31 --- /dev/null +++ b/docs/getting_started/operators.rst @@ -0,0 +1,24 @@ +.. _operators: + +Operators +========= + +Cosmos exposes individual operators that correspond to specific dbt commands, which can be used just like traditional +`Apache Airflow® `_ operators. Cosmos names these operators using the format ``DbtOperator``. For example, ``DbtBuildLocalOperator``. + +Clone +----- + +Requirement + +* Cosmos >= 1.8.0 +* dbt-core >= 1.6.0 + +The ``DbtCloneLocalOperator`` implement `dbt clone `_ command. + +Example of how to use + +.. literalinclude:: ../../dev/dags/example_operators.py + :language: python + :start-after: [START clone_example] + :end-before: [END clone_example] diff --git a/tests/operators/test_aws_eks.py b/tests/operators/test_aws_eks.py index 35717a061..bca007c4d 100644 --- a/tests/operators/test_aws_eks.py +++ b/tests/operators/test_aws_eks.py @@ -5,6 +5,7 @@ from cosmos.operators.aws_eks import ( DbtBuildAwsEksOperator, + DbtCloneAwsEksOperator, DbtLSAwsEksOperator, DbtRunAwsEksOperator, DbtSeedAwsEksOperator, @@ -44,6 +45,7 @@ def test_dbt_kubernetes_build_command(): "test": DbtTestAwsEksOperator(**base_kwargs), "build": DbtBuildAwsEksOperator(**base_kwargs), "seed": DbtSeedAwsEksOperator(**base_kwargs), + "clone": DbtCloneAwsEksOperator(**base_kwargs), } for command_name, command_operator in result_map.items(): diff --git a/tests/operators/test_azure_container_instance.py b/tests/operators/test_azure_container_instance.py index c57466619..4f1bdfaee 100644 --- a/tests/operators/test_azure_container_instance.py +++ b/tests/operators/test_azure_container_instance.py @@ -7,6 +7,7 @@ from cosmos.operators.azure_container_instance import ( DbtAzureContainerInstanceBaseOperator, DbtBuildAzureContainerInstanceOperator, + DbtCloneAzureContainerInstanceOperator, DbtLSAzureContainerInstanceOperator, DbtRunAzureContainerInstanceOperator, DbtSeedAzureContainerInstanceOperator, @@ -127,6 +128,7 @@ def test_dbt_azure_container_instance_operator_check_environment_variables( "run": DbtRunAzureContainerInstanceOperator(**base_kwargs), "test": DbtTestAzureContainerInstanceOperator(**base_kwargs), "seed": DbtSeedAzureContainerInstanceOperator(**base_kwargs), + "clone": DbtCloneAzureContainerInstanceOperator(**base_kwargs), } diff --git a/tests/operators/test_docker.py b/tests/operators/test_docker.py index 2cfb6b835..ba2ed43c9 100644 --- a/tests/operators/test_docker.py +++ b/tests/operators/test_docker.py @@ -7,6 +7,7 @@ from cosmos.operators.docker import ( DbtBuildDockerOperator, + DbtCloneDockerOperator, DbtLSDockerOperator, DbtRunDockerOperator, DbtSeedDockerOperator, @@ -113,6 +114,7 @@ def test_dbt_docker_operator_get_env(p_context_to_airflow_vars: MagicMock, base_ "test": DbtTestDockerOperator(**base_kwargs), "build": DbtBuildDockerOperator(**base_kwargs), "seed": DbtSeedDockerOperator(**base_kwargs), + "clone": DbtCloneDockerOperator(**base_kwargs), } diff --git a/tests/operators/test_gcp_cloud_run_job.py b/tests/operators/test_gcp_cloud_run_job.py index 08b7ba999..9cdd96bdb 100644 --- a/tests/operators/test_gcp_cloud_run_job.py +++ b/tests/operators/test_gcp_cloud_run_job.py @@ -10,6 +10,7 @@ try: from cosmos.operators.gcp_cloud_run_job import ( DbtBuildGcpCloudRunJobOperator, + DbtCloneGcpCloudRunJobOperator, DbtGcpCloudRunJobBaseOperator, DbtLSGcpCloudRunJobOperator, DbtRunGcpCloudRunJobOperator, @@ -173,6 +174,7 @@ def test_dbt_gcp_cloud_run_job_build_command(): "build": DbtBuildGcpCloudRunJobOperator(**BASE_KWARGS), "snapshot": DbtSnapshotGcpCloudRunJobOperator(**BASE_KWARGS), "source": DbtSourceGcpCloudRunJobOperator(**BASE_KWARGS), + "clone": DbtCloneGcpCloudRunJobOperator(**BASE_KWARGS), "run-operation": DbtRunOperationGcpCloudRunJobOperator(macro_name="some-macro", **BASE_KWARGS), } diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 51375f66b..e6ccdc4d7 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -10,6 +10,7 @@ from cosmos.operators.kubernetes import ( DbtBuildKubernetesOperator, + DbtCloneKubernetesOperator, DbtLSKubernetesOperator, DbtRunKubernetesOperator, DbtSeedKubernetesOperator, @@ -128,6 +129,7 @@ def test_dbt_kubernetes_operator_get_env(p_context_to_airflow_vars: MagicMock, b "test": DbtTestKubernetesOperator(**base_kwargs), "build": DbtBuildKubernetesOperator(**base_kwargs), "seed": DbtSeedKubernetesOperator(**base_kwargs), + "clone": DbtCloneKubernetesOperator(**base_kwargs), } diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 2de6ca1e3..1f065fd3e 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -29,6 +29,7 @@ from cosmos.hooks.subprocess import FullOutputSubprocessResult from cosmos.operators.local import ( DbtBuildLocalOperator, + DbtCloneLocalOperator, DbtCompileLocalOperator, DbtDocsAzureStorageLocalOperator, DbtDocsGCSLocalOperator, @@ -804,6 +805,11 @@ def test_store_compiled_sql() -> None: {"full_refresh": True}, {"context": {}, "env": {}, "cmd_flags": ["run", "--full-refresh"]}, ), + ( + DbtCloneLocalOperator, + {"full_refresh": True}, + {"context": {}, "env": {}, "cmd_flags": ["clone", "--full-refresh"]}, + ), ( DbtTestLocalOperator, {}, @@ -1161,6 +1167,19 @@ def test_dbt_compile_local_operator_initialisation(): assert "compile" in operator.base_cmd +def test_dbt_clone_local_operator_initialisation(): + operator = DbtCloneLocalOperator( + profile_config=profile_config, + project_dir=DBT_PROJ_DIR, + task_id="clone", + dbt_cmd_flags=["--state", "/usr/local/airflow/dbt/jaffle_shop/target"], + install_deps=True, + append_env=True, + ) + + assert "clone" in operator.base_cmd + + @patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") @patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", new=False) def test_configure_remote_target_path_object_storage_unavailable_on_earlier_airflow_versions(): diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index fdc76f321..5c950f478 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -15,7 +15,7 @@ from cosmos.config import ProfileConfig from cosmos.constants import InvocationMode from cosmos.exceptions import CosmosValueError -from cosmos.operators.virtualenv import DbtVirtualenvBaseOperator +from cosmos.operators.virtualenv import DbtCloneVirtualenvOperator, DbtVirtualenvBaseOperator from cosmos.profiles import PostgresUserPasswordProfileMapping AIRFLOW_VERSION = Version(airflow.__version__) @@ -376,3 +376,16 @@ def test_integration_virtualenv_operator(caplog): assert "Trying to run the command:\n ['/tmp/persistent-venv2/bin/dbt', 'deps'" in caplog.text assert "Trying to run the command:\n ['/tmp/persistent-venv2/bin/dbt', 'seed'" in caplog.text + + +def test_dbt_clone_virtualenv_operator_initialisation(): + operator = DbtCloneVirtualenvOperator( + profile_config=profile_config, + project_dir=DBT_PROJ_DIR, + task_id="clone", + dbt_cmd_flags=["--state", "/usr/local/airflow/dbt/jaffle_shop/target"], + install_deps=True, + append_env=True, + ) + + assert "clone" in operator.base_cmd diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 4be51a176..e647bc2fa 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -79,6 +79,8 @@ def get_dag_bag() -> DagBag: file.writelines(["example_cosmos_sources.py\n"]) if DBT_VERSION < Version("1.6.0"): file.writelines(["example_model_version.py\n"]) + file.writelines(["example_clone.py\n"]) + if DBT_VERSION < Version("1.5.0"): file.writelines(["example_source_rendering.py\n"]) diff --git a/tests/test_example_dags_no_connections.py b/tests/test_example_dags_no_connections.py index 70cfbc041..d18d21730 100644 --- a/tests/test_example_dags_no_connections.py +++ b/tests/test_example_dags_no_connections.py @@ -55,6 +55,7 @@ def get_dag_bag() -> DagBag: if DBT_VERSION < Version("1.6.0"): file.writelines(["example_model_version.py\n"]) + file.writelines(["example_clone.py\n"]) # cosmos_profile_mapping uses the automatic profile rendering from an Airflow connection. # so we can't parse that without live connections for file_name in ["cosmos_profile_mapping.py"]: