Skip to content

Commit

Permalink
Add dbt clone operator (#1326)
Browse files Browse the repository at this point in the history
This PR introduces the DbtCloneOperator. For more details, refer to the
dbt documentation: https://docs.getdbt.com/reference/commands/clone.

## Testing

**Airflow DAG**
```python
from datetime import datetime

from airflow import DAG
from cosmos import DbtSeedLocalOperator, DbtRunLocalOperator, DbtCloneLocalOperator, ProfileConfig

DBT_PROJ_DIR="/usr/local/airflow/dbt/jaffle_shop"

profile_config1=ProfileConfig(
profile_name="bigquery_dev",
target_name="dev",
profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml"
)

profile_config2=ProfileConfig(
profile_name="bigquery_clone",
target_name="dev",
profiles_yml_filepath="/usr/local/airflow/dbt/jaffle_shop/profiles.yml"
)


with DAG("test-id-1", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config1,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )
    run_operator = DbtRunLocalOperator(
        profile_config=profile_config1,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config2,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", "/usr/local/airflow/dbt/jaffle_shop/target"],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
``` 

**DBT Profile**
```
bigquery_dev:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: service-account
      project: astronomer-dag-authoring
      dataset: bq_dev
      threads: 4 # Must be a value of 1 or greater
      keyfile: /usr/local/airflow/include/key.json
      location: US


bigquery_clone:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: service-account
      project: astronomer-dag-authoring
      dataset: bq_clone
      threads: 4 # Must be a value of 1 or greater
      keyfile: /usr/local/airflow/include/key.json
      location: US
``` 

**Airflow DAG Run**

<img width="1660" alt="Screenshot 2024-11-15 at 6 06 50 PM"
src="https://github.com/user-attachments/assets/4a3af37e-3f6c-4859-814f-aeff3a252ac6">


**BQ data WH**

<img width="1454" alt="Screenshot 2024-11-15 at 6 04 29 PM"
src="https://github.com/user-attachments/assets/69b45f57-3ff5-43d9-a8f1-bb04e7ad5735">


## Related Issue(s)

closes: #1268
closes: #878

## Breaking Change?
No

## Limitation
- The `dbt clone` command was introduced in dbt-core 1.6.0, so this
feature is only available to users with dbt-core version 1.6 or higher
https://github.com/dbt-labs/dbt-core/blob/1.6.latest/CHANGELOG.md
- Users should ensure their database is supported for cloning
operations.
  • Loading branch information
pankajastro authored Nov 20, 2024
1 parent 573a90a commit 48055d8
Show file tree
Hide file tree
Showing 22 changed files with 231 additions and 1 deletion.
12 changes: 12 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from cosmos.operators.lazy_load import MissingPackage
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtCloneLocalOperator,
DbtDepsLocalOperator,
DbtLSLocalOperator,
DbtRunLocalOperator,
Expand All @@ -44,6 +45,7 @@
try:
from cosmos.operators.docker import (
DbtBuildDockerOperator,
DbtCloneDockerOperator,
DbtLSDockerOperator,
DbtRunDockerOperator,
DbtRunOperationDockerOperator,
Expand All @@ -65,6 +67,7 @@
try:
from cosmos.operators.kubernetes import (
DbtBuildKubernetesOperator,
DbtCloneKubernetesOperator,
DbtLSKubernetesOperator,
DbtRunKubernetesOperator,
DbtRunOperationKubernetesOperator,
Expand Down Expand Up @@ -106,6 +109,7 @@
try:
from cosmos.operators.azure_container_instance import (
DbtBuildAzureContainerInstanceOperator,
DbtCloneAzureContainerInstanceOperator,
DbtLSAzureContainerInstanceOperator,
DbtRunAzureContainerInstanceOperator,
DbtRunOperationAzureContainerInstanceOperator,
Expand Down Expand Up @@ -142,6 +146,7 @@
try:
from cosmos.operators.aws_eks import (
DbtBuildAwsEksOperator,
DbtCloneAwsEksOperator,
DbtLSAwsEksOperator,
DbtRunAwsEksOperator,
DbtRunOperationAwsEksOperator,
Expand Down Expand Up @@ -170,6 +175,7 @@
try:
from cosmos.operators.gcp_cloud_run_job import (
DbtBuildGcpCloudRunJobOperator,
DbtCloneGcpCloudRunJobOperator,
DbtLSGcpCloudRunJobOperator,
DbtRunGcpCloudRunJobOperator,
DbtRunOperationGcpCloudRunJobOperator,
Expand Down Expand Up @@ -217,6 +223,7 @@
"DbtResourceType",
# Local Execution Mode
"DbtBuildLocalOperator",
"DbtCloneLocalOperator",
"DbtDepsLocalOperator", # deprecated, to be delete in Cosmos 2.x
"DbtLSLocalOperator",
"DbtRunLocalOperator",
Expand All @@ -226,6 +233,7 @@
"DbtTestLocalOperator",
# Docker Execution Mode
"DbtBuildDockerOperator",
"DbtCloneDockerOperator",
"DbtLSDockerOperator",
"DbtRunDockerOperator",
"DbtRunOperationDockerOperator",
Expand All @@ -234,6 +242,7 @@
"DbtTestDockerOperator",
# Kubernetes Execution Mode
"DbtBuildKubernetesOperator",
"DbtCloneKubernetesOperator",
"DbtLSKubernetesOperator",
"DbtRunKubernetesOperator",
"DbtRunOperationKubernetesOperator",
Expand All @@ -242,6 +251,7 @@
"DbtTestKubernetesOperator",
# Azure Container Instance Execution Mode
"DbtBuildAzureContainerInstanceOperator",
"DbtCloneAzureContainerInstanceOperator",
"DbtLSAzureContainerInstanceOperator",
"DbtRunAzureContainerInstanceOperator",
"DbtRunOperationAzureContainerInstanceOperator",
Expand All @@ -250,6 +260,7 @@
"DbtTestAzureContainerInstanceOperator",
# AWS EKS Execution Mode
"DbtBuildAwsEksOperator",
"DbtCloneAwsEksOperator",
"DbtLSAwsEksOperator",
"DbtRunAwsEksOperator",
"DbtRunOperationAwsEksOperator",
Expand All @@ -258,6 +269,7 @@
"DbtTestAwsEksOperator",
# GCP Cloud Run Job Execution Mode
"DbtBuildGcpCloudRunJobOperator",
"DbtCloneGcpCloudRunJobOperator",
"DbtLSGcpCloudRunJobOperator",
"DbtRunGcpCloudRunJobOperator",
"DbtRunOperationGcpCloudRunJobOperator",
Expand Down
5 changes: 5 additions & 0 deletions cosmos/operators/airflow_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cosmos.operators.base import AbstractDbtBaseOperator
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtCloneLocalOperator,
DbtCompileLocalOperator,
DbtLocalBaseOperator,
DbtLSLocalOperator,
Expand Down Expand Up @@ -188,3 +189,7 @@ class DbtRunOperationAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtRunOpe

class DbtCompileAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCompileLocalOperator): # type: ignore
pass


class DbtCloneAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCloneLocalOperator):
pass
10 changes: 10 additions & 0 deletions cosmos/operators/aws_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from cosmos.operators.kubernetes import (
DbtBuildKubernetesOperator,
DbtCloneKubernetesOperator,
DbtKubernetesBaseOperator,
DbtLSKubernetesOperator,
DbtRunKubernetesOperator,
Expand Down Expand Up @@ -160,3 +161,12 @@ class DbtRunOperationAwsEksOperator(DbtAwsEksBaseOperator, DbtRunOperationKubern

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneAwsEksOperator(DbtAwsEksBaseOperator, DbtCloneKubernetesOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
10 changes: 10 additions & 0 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -167,3 +168,12 @@ class DbtRunOperationAzureContainerInstanceOperator(DbtRunOperationMixin, DbtAzu

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneAzureContainerInstanceOperator(DbtCloneMixin, DbtAzureContainerInstanceBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
25 changes: 25 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,3 +438,28 @@ class DbtCompileMixin:

base_cmd = ["compile"]
ui_color = "#877c7c"


class DbtCloneMixin:
"""Mixin for dbt clone command."""

base_cmd = ["clone"]
ui_color = "#83a300"

def __init__(self, full_refresh: bool | str = False, **kwargs: Any) -> None:
self.full_refresh = full_refresh
super().__init__(**kwargs)

def add_cmd_flags(self) -> list[str]:
flags = []

if isinstance(self.full_refresh, str):
# Handle template fields when render_template_as_native_obj=False
full_refresh = to_boolean(self.full_refresh)
else:
full_refresh = self.full_refresh

if full_refresh is True:
flags.append("--full-refresh")

return flags
10 changes: 10 additions & 0 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -148,3 +149,12 @@ class DbtRunOperationDockerOperator(DbtRunOperationMixin, DbtDockerBaseOperator)

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneDockerOperator(DbtCloneMixin, DbtDockerBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
10 changes: 10 additions & 0 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -180,3 +181,12 @@ class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRun

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneGcpCloudRunJobOperator(DbtCloneMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
8 changes: 8 additions & 0 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
Expand Down Expand Up @@ -260,3 +261,10 @@ class DbtRunOperationKubernetesOperator(DbtRunOperationMixin, DbtKubernetesBaseO

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtCloneKubernetesOperator(DbtCloneMixin, DbtKubernetesBaseOperator):
"""Executes a dbt core clone command."""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
10 changes: 10 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtCloneMixin,
DbtCompileMixin,
DbtLSMixin,
DbtRunMixin,
Expand Down Expand Up @@ -1009,3 +1010,12 @@ class DbtCompileLocalOperator(DbtCompileMixin, DbtLocalBaseOperator):
def __init__(self, *args: Any, **kwargs: Any) -> None:
kwargs["should_upload_compiled_sql"] = True
super().__init__(*args, **kwargs)


class DbtCloneLocalOperator(DbtCloneMixin, DbtLocalBaseOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
10 changes: 10 additions & 0 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from cosmos.log import get_logger
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtCloneLocalOperator,
DbtDocsLocalOperator,
DbtLocalBaseOperator,
DbtLSLocalOperator,
Expand Down Expand Up @@ -286,3 +287,12 @@ class DbtDocsVirtualenvOperator(DbtVirtualenvBaseOperator, DbtDocsLocalOperator)

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)


class DbtCloneVirtualenvOperator(DbtVirtualenvBaseOperator, DbtCloneLocalOperator):
"""
Executes a dbt core clone command.
"""

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
50 changes: 50 additions & 0 deletions dev/dags/example_operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
from datetime import datetime
from pathlib import Path

from airflow import DAG

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profiles_yml_filepath=DBT_PROFILE_PATH,
)

with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)

# [START clone_example]
clone_operator = DbtCloneLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="clone",
dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
install_deps=True,
append_env=True,
)
# [END clone_example]

seed_operator >> run_operator >> clone_operator
1 change: 1 addition & 0 deletions docs/getting_started/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Azure Container Instance Execution Mode <azure-container-instance>
GCP Cloud Run Job Execution Mode <gcp-cloud-run-job>
dbt and Airflow Similar Concepts <dbt-airflow-concepts>
Operators <operators>


Getting Started
Expand Down
24 changes: 24 additions & 0 deletions docs/getting_started/operators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
.. _operators:

Operators
=========

Cosmos exposes individual operators that correspond to specific dbt commands, which can be used just like traditional
`Apache Airflow® <https://airflow.apache.org/>`_ operators. Cosmos names these operators using the format ``Dbt<dbt-command><execution-mode>Operator``. For example, ``DbtBuildLocalOperator``.

Clone
-----

Requirement

* Cosmos >= 1.8.0
* dbt-core >= 1.6.0

The ``DbtCloneLocalOperator`` implement `dbt clone <https://docs.getdbt.com/reference/commands/clone>`_ command.

Example of how to use

.. literalinclude:: ../../dev/dags/example_operators.py
:language: python
:start-after: [START clone_example]
:end-before: [END clone_example]
2 changes: 2 additions & 0 deletions tests/operators/test_aws_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from cosmos.operators.aws_eks import (
DbtBuildAwsEksOperator,
DbtCloneAwsEksOperator,
DbtLSAwsEksOperator,
DbtRunAwsEksOperator,
DbtSeedAwsEksOperator,
Expand Down Expand Up @@ -44,6 +45,7 @@ def test_dbt_kubernetes_build_command():
"test": DbtTestAwsEksOperator(**base_kwargs),
"build": DbtBuildAwsEksOperator(**base_kwargs),
"seed": DbtSeedAwsEksOperator(**base_kwargs),
"clone": DbtCloneAwsEksOperator(**base_kwargs),
}

for command_name, command_operator in result_map.items():
Expand Down
Loading

0 comments on commit 48055d8

Please sign in to comment.