Skip to content

Commit

Permalink
test: add tests and wip integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
timonviola committed Nov 14, 2024
1 parent 3d54e19 commit 51557be
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 5 deletions.
1 change: 1 addition & 0 deletions tests/dags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This folder contains future "DAG tests", with should be run in CI with running OCI images. At the moment we had not enough time to finish these integration tests, and it is one of the highest priorities.
30 changes: 30 additions & 0 deletions tests/dags/test_msql_reflect_otherdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

import datetime

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from dagcellent.operators.sql_reflect import SQLReflectOperator

CONN_ID = "mssql_test"
DAG_ID = __file__.rstrip(".py").split("/")[-1]

with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:

reflect_table = SQLReflectOperator(
task_id="reflect_database", conn_id=CONN_ID, table_name="kaka", database="model"
)

execute = SQLExecuteQueryOperator(
task_id="execute_query",
conn_id=CONN_ID,
sql=reflect_table.output,
database="model",
)

reflect_table >> execute
33 changes: 33 additions & 0 deletions tests/dags/test_msql_reflect_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

import datetime

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from dagcellent.operators.sql_reflect import SQLReflectOperator

CONN_ID = "mssql_test"
DAG_ID = __file__.rstrip(".py").split("/")[-1]

with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:
reflect_table = SQLReflectOperator(
task_id="reflect_database",
conn_id=CONN_ID,
schema="guest",
table_name="schema_test",
database="model",
)

execute = SQLExecuteQueryOperator(
task_id="execute_query",
conn_id=CONN_ID,
sql=reflect_table.output,
database="model",
)

reflect_table >> execute
2 changes: 1 addition & 1 deletion tests/dags/test_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

DAG_ID = "mssql_operator_dag"
DAG_ID = __file__.rstrip(".py").split("/")[-1]
CONN_ID = "mssql_test"

with DAG(
Expand Down
3 changes: 2 additions & 1 deletion tests/dags/test_mssql_reflect.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from dagcellent.operators.sql_reflect import SQLReflectOperator

CONN_ID = "mssql_test"
DAG_ID = __file__.rstrip(".py").split("/")[-1]

with DAG(
dag_id="mssql_reflect",
dag_id=DAG_ID,
description=__doc__,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_psql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

DAG_ID = "postgres_operator_dag"
DAG_ID = __file__.rstrip(".py").split("/")[-1]
CONN_ID = "postgres_test"

with DAG(
Expand Down
3 changes: 2 additions & 1 deletion tests/dags/test_psql_reflect.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from dagcellent.operators.sql_reflect import SQLReflectOperator

CONN_ID = "postgres_test"
DAG_ID = __file__.rstrip(".py").split("/")[-1]

with DAG(
dag_id="psql_reflect",
dag_id=DAG_ID,
description=__doc__,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
Expand Down
45 changes: 45 additions & 0 deletions tests/dags/test_psql_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Test SQL reflection on PostrgeSQL."""
from __future__ import annotations

import datetime

from airflow import DAG
from dagcellent.operators import SqlToS3Operator
from dagcellent.operators.sql_reflect import SQLReflectOperator

DAG_ID = __file__.rstrip(".py").split("/")[-1]
CONN_ID = "postgres_test"
AWS_CONN_ID = "dummy"
S3_BUCKET = "dummy"

with DAG(
dag_id=DAG_ID,
description=__doc__,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:
reflect_table = SQLReflectOperator(
table_name="ats",
task_id="reflect_database",
conn_id=CONN_ID,
)

sql_to_s3 = SqlToS3Operator(
task_id="to_s3",
query=reflect_table.output,
database="doesnt matter",
sql_conn_id=CONN_ID,
s3_bucket=S3_BUCKET,
s3_key="airflow/wms_test/lea_inbounddocument/full_load.parquet",
aws_conn_id=AWS_CONN_ID,
file_format="parquet",
replace=True,
pd_kwargs={
"engine": "pyarrow",
"version": "2.6",
"coerce_timestamps": "us",
},
)

reflect_table >> sql_to_s3 # pyright: ignore[reportUnusedExpression]
3 changes: 2 additions & 1 deletion tests/dags/test_sql_reflect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from airflow import DAG
from dagcellent.operators.sql_reflect import SQLReflectOperator

DAG_ID = __file__.rstrip(".py").split("/")[-1]
CONN_ID = "mssql_test"

with DAG(
dag_id="mssql_reflect",
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
Expand Down

0 comments on commit 51557be

Please sign in to comment.