Skip to content

Commit

Permalink
Add operator to check that artifact was uploaded using callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Dec 17, 2024
1 parent de9e2c0 commit fbd1f3d
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions dev/dags/example_operators.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG
from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_artifacts_to_aws_s3
Expand All @@ -19,6 +21,17 @@
profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""Check if a file exists in the given S3 bucket."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
# [START single_operator_callback]
seed_operator = DbtSeedLocalOperator(
Expand All @@ -43,6 +56,16 @@
)
# [END single_operator_callback]

check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)

run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
Expand All @@ -64,3 +87,4 @@
# [END clone_example]

seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task

0 comments on commit fbd1f3d

Please sign in to comment.