-
Notifications
You must be signed in to change notification settings - Fork 180
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add helper functions for uploading target directory artifacts to remo…
…te cloud storages (#1389) This PR introduces helper functions that can be passed as callable callbacks for Cosmos tasks to execute post-task execution. These helper functions enable the uploading of artifacts (from the project's target directory) to various cloud storage providers, including AWS S3, Google Cloud Storage (GCS), Azure WASB, and general remote object stores using Airflow’s ObjectStoragePath. ## Key Changes Adds a `cosmos/io.py` module that includes the following helper functions 1. `upload_artifacts_to_aws_s3` - Uploads artifact files from a task’s local target directory to an AWS S3 bucket. - Supports dynamically appending DAG metadata (e.g., dag_id, task_id, run_id, and try number) to the uploaded file paths. - Utilizes S3Hook from the airflow.providers.amazon.aws module. 2. `upload_artifacts_to_gcp_gs` - Uploads artifact files from a task’s local target directory to a Google Cloud Storage (GCS) bucket. - Appends DAG-related context to the GCS object names for better traceability. - Leverages GCSHook from airflow.providers.google.cloud. 3. `upload_artifacts_to_azure_wasb` - Uploads artifact files from a task’s local target directory to an Azure Blob Storage container. - Automatically structures blob names with metadata, including dag_id, task_id, and execution details. - Utilizes WasbHook from the airflow.providers.microsoft.azure module. 4. `upload_artifacts_to_cloud_storage` - A generic helper function that uploads artifacts from a task’s local target directory to remote object stores configured via Airflow’s ObjectStoragePath (Airflow 2.8+ feature). - Supports custom remote storage configurations such as `remote_target_path` and `remote_target_path_conn_id`. - Dynamically constructs file paths that include DAG metadata for clear organization. These helpers functions can be passed as the `callback` argument to `DbtDAG` or to your `Dag` instance as demonstrated in the example DAGs `dev/dags/cosmos_callback_dag.py` and `dev/dags/example_operators.py` correspondingly. You can also pass `callback_args` as shown in the example DAGs. These helper functions are mere examples of how callback functions can be written and passed to your operators/DAGs to be executed after task completions. Taking reference of these helper functions, you can write your own callback function and pass those. ## Limitations 1. This PR has been tested and is currently supported only in `ExecutionMode.LOCAL`. We encourage the community to contribute by adding callback support for other execution modes as needed, using the implementation for `ExecutionMode.LOCAL` as a reference. closes: #1350 closes: #976 closes: #867 closes: #801 closes: #1292 closes: #851 closes: #1351 related: #1293 related: #1349
- Loading branch information
1 parent
cbd8622
commit 0000f80
Showing
9 changed files
with
545 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
from __future__ import annotations | ||
|
||
import os | ||
from pathlib import Path | ||
from typing import Any | ||
from urllib.parse import urlparse | ||
|
||
from cosmos import settings | ||
from cosmos.constants import DEFAULT_TARGET_PATH, FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP | ||
from cosmos.exceptions import CosmosValueError | ||
from cosmos.settings import remote_target_path, remote_target_path_conn_id | ||
|
||
|
||
def upload_to_aws_s3( | ||
project_dir: str, | ||
bucket_name: str, | ||
aws_conn_id: str | None = None, | ||
source_subpath: str = DEFAULT_TARGET_PATH, | ||
**kwargs: Any, | ||
) -> None: | ||
""" | ||
Helper function demonstrating how to upload files to AWS S3 that can be used as a callback. | ||
:param project_dir: Path of the cloned project directory which Cosmos tasks work from. | ||
:param bucket_name: Name of the S3 bucket to upload to. | ||
:param aws_conn_id: AWS connection ID to use when uploading files. | ||
:param source_subpath: Path of the source directory sub-path to upload files from. | ||
""" | ||
from airflow.providers.amazon.aws.hooks.s3 import S3Hook | ||
|
||
target_dir = f"{project_dir}/{source_subpath}" | ||
aws_conn_id = aws_conn_id if aws_conn_id else S3Hook.default_conn_name | ||
hook = S3Hook(aws_conn_id=aws_conn_id) | ||
context = kwargs["context"] | ||
|
||
# Iterate over the files in the target dir and upload them to S3 | ||
for dirpath, _, filenames in os.walk(target_dir): | ||
for filename in filenames: | ||
s3_key = ( | ||
f"{context['dag'].dag_id}" | ||
f"/{context['run_id']}" | ||
f"/{context['task_instance'].task_id}" | ||
f"/{context['task_instance']._try_number}" | ||
f"{dirpath.split(project_dir)[-1]}/{filename}" | ||
) | ||
hook.load_file( | ||
filename=f"{dirpath}/{filename}", | ||
bucket_name=bucket_name, | ||
key=s3_key, | ||
replace=True, | ||
) | ||
|
||
|
||
def upload_to_gcp_gs( | ||
project_dir: str, | ||
bucket_name: str, | ||
gcp_conn_id: str | None = None, | ||
source_subpath: str = DEFAULT_TARGET_PATH, | ||
**kwargs: Any, | ||
) -> None: | ||
""" | ||
Helper function demonstrating how to upload files to GCP GS that can be used as a callback. | ||
:param project_dir: Path of the cloned project directory which Cosmos tasks work from. | ||
:param bucket_name: Name of the GCP GS bucket to upload to. | ||
:param gcp_conn_id: GCP connection ID to use when uploading files. | ||
:param source_subpath: Path of the source directory sub-path to upload files from. | ||
""" | ||
from airflow.providers.google.cloud.hooks.gcs import GCSHook | ||
|
||
target_dir = f"{project_dir}/{source_subpath}" | ||
gcp_conn_id = gcp_conn_id if gcp_conn_id else GCSHook.default_conn_name | ||
# bucket_name = kwargs["bucket_name"] | ||
hook = GCSHook(gcp_conn_id=gcp_conn_id) | ||
context = kwargs["context"] | ||
|
||
# Iterate over the files in the target dir and upload them to GCP GS | ||
for dirpath, _, filenames in os.walk(target_dir): | ||
for filename in filenames: | ||
object_name = ( | ||
f"{context['dag'].dag_id}" | ||
f"/{context['run_id']}" | ||
f"/{context['task_instance'].task_id}" | ||
f"/{context['task_instance']._try_number}" | ||
f"{dirpath.split(project_dir)[-1]}/{filename}" | ||
) | ||
hook.upload( | ||
filename=f"{dirpath}/{filename}", | ||
bucket_name=bucket_name, | ||
object_name=object_name, | ||
) | ||
|
||
|
||
def upload_to_azure_wasb( | ||
project_dir: str, | ||
container_name: str, | ||
azure_conn_id: str | None = None, | ||
source_subpath: str = DEFAULT_TARGET_PATH, | ||
**kwargs: Any, | ||
) -> None: | ||
""" | ||
Helper function demonstrating how to upload files to Azure WASB that can be used as a callback. | ||
:param project_dir: Path of the cloned project directory which Cosmos tasks work from. | ||
:param container_name: Name of the Azure WASB container to upload files to. | ||
:param azure_conn_id: Azure connection ID to use when uploading files. | ||
:param source_subpath: Path of the source directory sub-path to upload files from. | ||
""" | ||
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook | ||
|
||
target_dir = f"{project_dir}/{source_subpath}" | ||
azure_conn_id = azure_conn_id if azure_conn_id else WasbHook.default_conn_name | ||
# container_name = kwargs["container_name"] | ||
hook = WasbHook(wasb_conn_id=azure_conn_id) | ||
context = kwargs["context"] | ||
|
||
# Iterate over the files in the target dir and upload them to WASB container | ||
for dirpath, _, filenames in os.walk(target_dir): | ||
for filename in filenames: | ||
blob_name = ( | ||
f"{context['dag'].dag_id}" | ||
f"/{context['run_id']}" | ||
f"/{context['task_instance'].task_id}" | ||
f"/{context['task_instance']._try_number}" | ||
f"{dirpath.split(project_dir)[-1]}/{filename}" | ||
) | ||
hook.load_file( | ||
file_path=f"{dirpath}/{filename}", | ||
container_name=container_name, | ||
blob_name=blob_name, | ||
overwrite=True, | ||
) | ||
|
||
|
||
def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: | ||
"""Configure the remote target path if it is provided.""" | ||
from airflow.version import version as airflow_version | ||
|
||
if not remote_target_path: | ||
return None, None | ||
|
||
_configured_target_path = None | ||
|
||
target_path_str = str(remote_target_path) | ||
|
||
remote_conn_id = remote_target_path_conn_id | ||
if not remote_conn_id: | ||
target_path_schema = urlparse(target_path_str).scheme | ||
remote_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(target_path_schema, None) # type: ignore[assignment] | ||
if remote_conn_id is None: | ||
return None, None | ||
|
||
if not settings.AIRFLOW_IO_AVAILABLE: | ||
raise CosmosValueError( | ||
f"You're trying to specify remote target path {target_path_str}, but the required " | ||
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " | ||
"Airflow 2.8 or later." | ||
) | ||
|
||
from airflow.io.path import ObjectStoragePath | ||
|
||
_configured_target_path = ObjectStoragePath(target_path_str, conn_id=remote_conn_id) | ||
|
||
if not _configured_target_path.exists(): # type: ignore[no-untyped-call] | ||
_configured_target_path.mkdir(parents=True, exist_ok=True) | ||
|
||
return _configured_target_path, remote_conn_id | ||
|
||
|
||
def _construct_dest_file_path( | ||
dest_target_dir: Path, | ||
file_path: str, | ||
source_target_dir: Path, | ||
source_subpath: str, | ||
**kwargs: Any, | ||
) -> str: | ||
""" | ||
Construct the destination path for the artifact files to be uploaded to the remote store. | ||
""" | ||
dest_target_dir_str = str(dest_target_dir).rstrip("/") | ||
|
||
context = kwargs["context"] | ||
task_run_identifier = ( | ||
f"{context['dag'].dag_id}" | ||
f"/{context['run_id']}" | ||
f"/{context['task_instance'].task_id}" | ||
f"/{context['task_instance']._try_number}" | ||
) | ||
rel_path = os.path.relpath(file_path, source_target_dir).lstrip("/") | ||
|
||
return f"{dest_target_dir_str}/{task_run_identifier}/{source_subpath}/{rel_path}" | ||
|
||
|
||
def upload_to_cloud_storage(project_dir: str, source_subpath: str = DEFAULT_TARGET_PATH, **kwargs: Any) -> None: | ||
""" | ||
Helper function demonstrating how to upload files to remote object stores that can be used as a callback. This is | ||
an example of a helper function that can be used if on Airflow >= 2.8 and cosmos configurations like | ||
``remote_target_path`` and ``remote_target_path_conn_id`` when set can be leveraged. | ||
:param project_dir: Path of the cloned project directory which Cosmos tasks work from. | ||
:param source_subpath: Path of the source directory sub-path to upload files from. | ||
""" | ||
dest_target_dir, dest_conn_id = _configure_remote_target_path() | ||
|
||
if not dest_target_dir: | ||
raise CosmosValueError("You're trying to upload artifact files, but the remote target path is not configured.") | ||
|
||
from airflow.io.path import ObjectStoragePath | ||
|
||
source_target_dir = Path(project_dir) / f"{source_subpath}" | ||
files = [str(file) for file in source_target_dir.rglob("*") if file.is_file()] | ||
for file_path in files: | ||
dest_file_path = _construct_dest_file_path( | ||
dest_target_dir, file_path, source_target_dir, source_subpath, **kwargs | ||
) | ||
dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id) | ||
ObjectStoragePath(file_path).copy(dest_object_storage_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
""" | ||
An example DAG that uses Cosmos to render a dbt project into an Airflow DAG. | ||
""" | ||
|
||
import os | ||
from datetime import datetime | ||
from pathlib import Path | ||
|
||
from cosmos import DbtDag, ProfileConfig, ProjectConfig | ||
from cosmos.io import upload_to_cloud_storage | ||
from cosmos.profiles import PostgresUserPasswordProfileMapping | ||
|
||
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" | ||
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) | ||
|
||
profile_config = ProfileConfig( | ||
profile_name="default", | ||
target_name="dev", | ||
profile_mapping=PostgresUserPasswordProfileMapping( | ||
conn_id="example_conn", | ||
profile_args={"schema": "public"}, | ||
disable_event_tracking=True, | ||
), | ||
) | ||
|
||
# [START cosmos_callback_example] | ||
cosmos_callback_dag = DbtDag( | ||
# dbt/cosmos-specific parameters | ||
project_config=ProjectConfig( | ||
DBT_ROOT_PATH / "jaffle_shop", | ||
), | ||
profile_config=profile_config, | ||
operator_args={ | ||
"install_deps": True, # install any necessary dependencies before running any dbt command | ||
"full_refresh": True, # used only in dbt commands that support this flag | ||
# -------------------------------------------------------------- | ||
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above | ||
"callback": upload_to_cloud_storage, | ||
# -------------------------------------------------------------- | ||
# Callback function to upload files to AWS S3, works for Airflow < 2.8 too | ||
# "callback": upload_to_aws_s3, | ||
# "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}, | ||
# -------------------------------------------------------------- | ||
# Callback function to upload files to GCP GS, works for Airflow < 2.8 too | ||
# "callback": upload_to_gcp_gs, | ||
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"}, | ||
# -------------------------------------------------------------- | ||
# Callback function to upload files to Azure WASB, works for Airflow < 2.8 too | ||
# "callback": upload_to_azure_wasb, | ||
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"}, | ||
# -------------------------------------------------------------- | ||
}, | ||
# normal dag parameters | ||
schedule_interval="@daily", | ||
start_date=datetime(2023, 1, 1), | ||
catchup=False, | ||
dag_id="cosmos_callback_dag", | ||
default_args={"retries": 2}, | ||
) | ||
# [END cosmos_callback_example] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.