Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add helper functions for uploading target directory artifacts to remote cloud storages #1389

Merged
merged 10 commits into from
Dec 17, 2024
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DEFAULT_DBT_PROFILE_NAME = "cosmos_profile"
DEFAULT_DBT_TARGET_NAME = "cosmos_target"
DEFAULT_COSMOS_CACHE_DIR_NAME = "cosmos"
DEFAULT_TARGET_PATH = "target"
DBT_LOG_PATH_ENVVAR = "DBT_LOG_PATH"
DBT_LOG_DIR_NAME = "logs"
DBT_TARGET_PATH_ENVVAR = "DBT_TARGET_PATH"
Expand Down
217 changes: 217 additions & 0 deletions cosmos/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
from __future__ import annotations

import os
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

from cosmos import settings
from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP
from cosmos.exceptions import CosmosValueError
from cosmos.settings import remote_target_path, remote_target_path_conn_id


def upload_to_aws_s3(
project_dir: str,
bucket_name: str,
aws_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to AWS S3 that can be used as a callback.

:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the S3 bucket to upload to.
:param aws_conn_id: AWS connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

target_dir = f"{project_dir}/{source_subpath}"
aws_conn_id = aws_conn_id if aws_conn_id else S3Hook.default_conn_name
hook = S3Hook(aws_conn_id=aws_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to S3
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
s3_key = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
filename=f"{dirpath}/{filename}",
bucket_name=bucket_name,
key=s3_key,
replace=True,
)


def upload_to_gcp_gs(
project_dir: str,
bucket_name: str,
gcp_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to GCP GS that can be used as a callback.

:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param bucket_name: Name of the GCP GS bucket to upload to.
:param gcp_conn_id: GCP connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.google.cloud.hooks.gcs import GCSHook

target_dir = f"{project_dir}/{source_subpath}"
gcp_conn_id = gcp_conn_id if gcp_conn_id else GCSHook.default_conn_name
# bucket_name = kwargs["bucket_name"]
hook = GCSHook(gcp_conn_id=gcp_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to GCP GS
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
object_name = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.upload(
filename=f"{dirpath}/{filename}",
bucket_name=bucket_name,
object_name=object_name,
)


def upload_to_azure_wasb(
project_dir: str,
container_name: str,
azure_conn_id: str | None = None,
source_subpath: str = DEFAULT_TARGET_PATH,
**kwargs: Any,
) -> None:
"""
Helper function demonstrating how to upload files to Azure WASB that can be used as a callback.

:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param container_name: Name of the Azure WASB container to upload files to.
:param azure_conn_id: Azure connection ID to use when uploading files.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook

target_dir = f"{project_dir}/{source_subpath}"
azure_conn_id = azure_conn_id if azure_conn_id else WasbHook.default_conn_name
# container_name = kwargs["container_name"]
hook = WasbHook(wasb_conn_id=azure_conn_id)
context = kwargs["context"]

# Iterate over the files in the target dir and upload them to WASB container
for dirpath, _, filenames in os.walk(target_dir):
for filename in filenames:
blob_name = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
f"{dirpath.split(project_dir)[-1]}/{filename}"
)
hook.load_file(
file_path=f"{dirpath}/{filename}",
container_name=container_name,
blob_name=blob_name,
overwrite=True,
)


def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]:
"""Configure the remote target path if it is provided."""
from airflow.version import version as airflow_version

if not remote_target_path:
return None, None

_configured_target_path = None

target_path_str = str(remote_target_path)

remote_conn_id = remote_target_path_conn_id
if not remote_conn_id:
target_path_schema = urlparse(target_path_str).scheme
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment]
if remote_conn_id is None:
return None, None

if not settings.AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote target path {target_path_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath

_configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id)

if not _configured_target_path.exists(): # type: ignore[no-untyped-call]
_configured_target_path.mkdir(parents=True, exist_ok=True)

Check warning on line 165 in cosmos/io.py

View check run for this annotation

Codecov / codecov/patch

cosmos/io.py#L165

Added line #L165 was not covered by tests

return _configured_target_path, remote_conn_id


def _construct_dest_file_path(
dest_target_dir: Path,
file_path: str,
source_target_dir: Path,
source_subpath: str,
**kwargs: Any,
) -> str:
"""
Construct the destination path for the artifact files to be uploaded to the remote store.
"""
dest_target_dir_str = str(dest_target_dir).rstrip("/")

context = kwargs["context"]
task_run_identifier = (
f"{context['dag'].dag_id}"
f"/{context['run_id']}"
f"/{context['task_instance'].task_id}"
f"/{context['task_instance']._try_number}"
)
rel_path = os.path.relpath(file_path, source_target_dir).lstrip("/")

return f"{dest_target_dir_str}/{task_run_identifier}/{source_subpath}/{rel_path}"


def upload_to_cloud_storage(project_dir: str, source_subpath: str = DEFAULT_TARGET_PATH, **kwargs: Any) -> None:
"""
Helper function demonstrating how to upload files to remote object stores that can be used as a callback. This is
an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like
``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged.

:param project_dir: Path of the cloned project directory which Cosmos tasks work from.
:param source_subpath: Path of the source directory sub-path to upload files from.
"""
dest_target_dir, dest_conn_id = _configure_remote_target_path()

if not dest_target_dir:
raise CosmosValueError("You're trying to upload artifact files, but the remote target path is not configured.")

from airflow.io.path import ObjectStoragePath

source_target_dir = Path(project_dir) / f"{source_subpath}"
files = [str(file) for file in source_target_dir.rglob("*") if file.is_file()]
for file_path in files:
dest_file_path = _construct_dest_file_path(
dest_target_dir, file_path, source_target_dir, source_subpath, **kwargs
)
dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id)
ObjectStoragePath(file_path).copy(dest_object_storage_path)
7 changes: 5 additions & 2 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def __init__(
invocation_mode: InvocationMode | None = None,
install_deps: bool = False,
callback: Callable[[str], None] | None = None,
callback_args: dict[str, Any] | None = None,
should_store_compiled_sql: bool = True,
should_upload_compiled_sql: bool = False,
append_env: bool = True,
Expand All @@ -149,6 +150,7 @@ def __init__(
self.task_id = task_id
self.profile_config = profile_config
self.callback = callback
self.callback_args = callback_args or {}
self.compiled_sql = ""
self.freshness = ""
self.should_store_compiled_sql = should_store_compiled_sql
Expand Down Expand Up @@ -500,9 +502,10 @@ def run_command(
self.store_freshness_json(tmp_project_dir, context)
self.store_compiled_sql(tmp_project_dir, context)
self.upload_compiled_sql(tmp_project_dir, context)
self.handle_exception(result)
if self.callback:
self.callback(tmp_project_dir)
self.callback_args.update({"context": context})
self.callback(tmp_project_dir, **self.callback_args)
self.handle_exception(result)

return result

Expand Down
60 changes: 60 additions & 0 deletions dev/dags/cosmos_callback_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
An example DAG that uses Cosmos to render a dbt project into an Airflow DAG.
"""

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProfileConfig, ProjectConfig
from cosmos.io import upload_to_cloud_storage
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)

# [START cosmos_callback_example]
cosmos_callback_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
# --------------------------------------------------------------
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
"callback": upload_to_cloud_storage,
# --------------------------------------------------------------
# Callback function to upload files to AWS S3, works for Airflow < 2.8 too
# "callback": upload_to_aws_s3,
# "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to GCP GS, works for Airflow < 2.8 too
# "callback": upload_to_gcp_gs,
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to Azure WASB, works for Airflow < 2.8 too
# "callback": upload_to_azure_wasb,
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_callback_dag",
default_args={"retries": 2},
)
# [END cosmos_callback_example]
41 changes: 41 additions & 0 deletions dev/dags/example_operators.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG
from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
Expand All @@ -18,15 +21,52 @@
profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""Check if a file exists in the given S3 bucket."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
# [START single_operator_callback]
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
# --------------------------------------------------------------
# Callback function to upload artifacts to AWS S3
callback=upload_to_aws_s3,
callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to GCP GS
# callback=upload_to_gcp_gs,
# callback_args={"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to Azure WASB
# callback=upload_to_azure_wasb,
# callback_args={"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
)
# [END single_operator_callback]

check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)

run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
Expand All @@ -48,3 +88,4 @@
# [END clone_example]

seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task
Loading
Loading