diff --git a/tests/dags/README.md b/tests/dags/README.md new file mode 100644 index 0000000..79bf68c --- /dev/null +++ b/tests/dags/README.md @@ -0,0 +1 @@ +This folder contains future "DAG tests", with should be run in CI with running OCI images. At the moment we had not enough time to finish these integration tests, and it is one of the highest priorities. diff --git a/tests/dags/test_msql_reflect_otherdb.py b/tests/dags/test_msql_reflect_otherdb.py new file mode 100644 index 0000000..aa4892d --- /dev/null +++ b/tests/dags/test_msql_reflect_otherdb.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import datetime + +from airflow import DAG +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator +from dagcellent.operators.sql_reflect import SQLReflectOperator + +CONN_ID = "mssql_test" +DAG_ID = __file__.rstrip(".py").split("/")[-1] + +with DAG( + dag_id=DAG_ID, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, +) as dag: + + reflect_table = SQLReflectOperator( + task_id="reflect_database", conn_id=CONN_ID, table_name="kaka", database="model" + ) + + execute = SQLExecuteQueryOperator( + task_id="execute_query", + conn_id=CONN_ID, + sql=reflect_table.output, + database="model", + ) + + reflect_table >> execute diff --git a/tests/dags/test_msql_reflect_schema.py b/tests/dags/test_msql_reflect_schema.py new file mode 100644 index 0000000..9bf7e2f --- /dev/null +++ b/tests/dags/test_msql_reflect_schema.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import datetime + +from airflow import DAG +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator +from dagcellent.operators.sql_reflect import SQLReflectOperator + +CONN_ID = "mssql_test" +DAG_ID = __file__.rstrip(".py").split("/")[-1] + +with DAG( + dag_id=DAG_ID, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, +) as dag: + reflect_table = SQLReflectOperator( + task_id="reflect_database", + conn_id=CONN_ID, + schema="guest", + table_name="schema_test", + database="model", + ) + + execute = SQLExecuteQueryOperator( + task_id="execute_query", + conn_id=CONN_ID, + sql=reflect_table.output, + database="model", + ) + + reflect_table >> execute diff --git a/tests/dags/test_mssql.py b/tests/dags/test_mssql.py index 91c0740..15dfd15 100644 --- a/tests/dags/test_mssql.py +++ b/tests/dags/test_mssql.py @@ -5,7 +5,7 @@ from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator -DAG_ID = "mssql_operator_dag" +DAG_ID = __file__.rstrip(".py").split("/")[-1] CONN_ID = "mssql_test" with DAG( diff --git a/tests/dags/test_mssql_reflect.py b/tests/dags/test_mssql_reflect.py index 9ece270..3cf2143 100644 --- a/tests/dags/test_mssql_reflect.py +++ b/tests/dags/test_mssql_reflect.py @@ -10,9 +10,10 @@ from dagcellent.operators.sql_reflect import SQLReflectOperator CONN_ID = "mssql_test" +DAG_ID = __file__.rstrip(".py").split("/")[-1] with DAG( - dag_id="mssql_reflect", + dag_id=DAG_ID, description=__doc__, start_date=datetime.datetime(2020, 2, 2), schedule="@once", diff --git a/tests/dags/test_psql.py b/tests/dags/test_psql.py index 396998d..1ed9c26 100644 --- a/tests/dags/test_psql.py +++ b/tests/dags/test_psql.py @@ -5,7 +5,7 @@ from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator -DAG_ID = "postgres_operator_dag" +DAG_ID = __file__.rstrip(".py").split("/")[-1] CONN_ID = "postgres_test" with DAG( diff --git a/tests/dags/test_psql_reflect.py b/tests/dags/test_psql_reflect.py index 6743860..9d5439f 100644 --- a/tests/dags/test_psql_reflect.py +++ b/tests/dags/test_psql_reflect.py @@ -7,9 +7,10 @@ from dagcellent.operators.sql_reflect import SQLReflectOperator CONN_ID = "postgres_test" +DAG_ID = __file__.rstrip(".py").split("/")[-1] with DAG( - dag_id="psql_reflect", + dag_id=DAG_ID, description=__doc__, start_date=datetime.datetime(2020, 2, 2), schedule="@once", diff --git a/tests/dags/test_psql_s3.py b/tests/dags/test_psql_s3.py new file mode 100644 index 0000000..d04741f --- /dev/null +++ b/tests/dags/test_psql_s3.py @@ -0,0 +1,45 @@ +"""Test SQL reflection on PostrgeSQL.""" +from __future__ import annotations + +import datetime + +from airflow import DAG +from dagcellent.operators import SqlToS3Operator +from dagcellent.operators.sql_reflect import SQLReflectOperator + +DAG_ID = __file__.rstrip(".py").split("/")[-1] +CONN_ID = "postgres_test" +AWS_CONN_ID = "dummy" +S3_BUCKET = "dummy" + +with DAG( + dag_id=DAG_ID, + description=__doc__, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, +) as dag: + reflect_table = SQLReflectOperator( + table_name="ats", + task_id="reflect_database", + conn_id=CONN_ID, + ) + + sql_to_s3 = SqlToS3Operator( + task_id="to_s3", + query=reflect_table.output, + database="doesnt matter", + sql_conn_id=CONN_ID, + s3_bucket=S3_BUCKET, + s3_key="airflow/wms_test/lea_inbounddocument/full_load.parquet", + aws_conn_id=AWS_CONN_ID, + file_format="parquet", + replace=True, + pd_kwargs={ + "engine": "pyarrow", + "version": "2.6", + "coerce_timestamps": "us", + }, + ) + + reflect_table >> sql_to_s3 # pyright: ignore[reportUnusedExpression] diff --git a/tests/dags/test_sql_reflect.py b/tests/dags/test_sql_reflect.py index 51f7885..c5c47d6 100644 --- a/tests/dags/test_sql_reflect.py +++ b/tests/dags/test_sql_reflect.py @@ -5,10 +5,11 @@ from airflow import DAG from dagcellent.operators.sql_reflect import SQLReflectOperator +DAG_ID = __file__.rstrip(".py").split("/")[-1] CONN_ID = "mssql_test" with DAG( - dag_id="mssql_reflect", + dag_id=DAG_ID, start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False,