-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathbasic_cosmos_task_group.py
80 lines (66 loc) · 2.41 KB
/
basic_cosmos_task_group.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
An example DAG that uses Cosmos to render a dbt project as an Airflow TaskGroup.
"""
import os
from datetime import datetime
from pathlib import Path
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import InvocationMode
from cosmos.profiles import PostgresUserPasswordProfileMapping
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
),
)
shared_execution_config = ExecutionConfig(
invocation_mode=InvocationMode.SUBPROCESS,
)
@dag(
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
)
def basic_cosmos_task_group() -> None:
"""
The simplest example of using Cosmos to render a dbt project as a TaskGroup.
"""
pre_dbt = EmptyOperator(task_id="pre_dbt")
customers = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig((DBT_ROOT_PATH / "jaffle_shop").as_posix(), dbt_vars={"var": "2"}),
render_config=RenderConfig(
select=["path:seeds/raw_customers.csv"],
enable_mock_profile=False,
env_vars={"PURGE": os.getenv("PURGE", "0")},
airflow_vars_to_purge_dbt_ls_cache=["purge"],
),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)
orders = DbtTaskGroup(
group_id="orders",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(
select=["path:seeds/raw_orders.csv"],
enable_mock_profile=False, # This is necessary to benefit from partial parsing when using ProfileMapping
),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)
post_dbt = EmptyOperator(task_id="post_dbt")
pre_dbt >> customers >> post_dbt
pre_dbt >> orders >> post_dbt
basic_cosmos_task_group()