diff --git a/airflow/providers/teradata/hooks/teradata.py b/airflow/providers/teradata/hooks/teradata.py index 3afc32bc74746..c5070d1daff39 100644 --- a/airflow/providers/teradata/hooks/teradata.py +++ b/airflow/providers/teradata/hooks/teradata.py @@ -32,6 +32,17 @@ if TYPE_CHECKING: from airflow.models.connection import Connection +PARAM_TYPES = {bool, float, int, str} + + +def _map_param(value): + if value in PARAM_TYPES: + # In this branch, value is a Python type; calling it produces + # an instance of the type which is understood by the Teradata driver + # in the out parameter mapping mechanism. + value = value() + return value + class TeradataHook(DbApiHook): """General hook for interacting with Teradata SQL Database. @@ -187,3 +198,58 @@ def get_ui_field_behaviour() -> dict: "password": "dbc", }, } + + def callproc( + self, + identifier: str, + autocommit: bool = False, + parameters: list | dict | None = None, + ) -> list | dict | tuple | None: + """ + Call the stored procedure identified by the provided string. + + Any OUT parameters must be provided with a value of either the + expected Python type (e.g., `int`) or an instance of that type. + + :param identifier: stored procedure name + :param autocommit: What to set the connection's autocommit setting to + before executing the query. + :param parameters: The `IN`, `OUT` and `INOUT` parameters for Teradata + stored procedure + + The return value is a list or mapping that includes parameters in + both directions; the actual return type depends on the type of the + provided `parameters` argument. + + """ + if parameters is None: + parameters = [] + + args = ",".join("?" for name in parameters) + + sql = f"{{CALL {identifier}({(args)})}}" + + def handler(cursor): + records = cursor.fetchall() + + if records is None: + return + if isinstance(records, list): + return [row for row in records] + + if isinstance(records, dict): + return {n: v for (n, v) in records.items()} + raise TypeError(f"Unexpected results: {records}") + + result = self.run( + sql, + autocommit=autocommit, + parameters=( + [_map_param(value) for (name, value) in parameters.items()] + if isinstance(parameters, dict) + else [_map_param(value) for value in parameters] + ), + handler=handler, + ) + + return result diff --git a/airflow/providers/teradata/operators/teradata.py b/airflow/providers/teradata/operators/teradata.py index ae87fd785487f..00cd7a86c7d0b 100644 --- a/airflow/providers/teradata/operators/teradata.py +++ b/airflow/providers/teradata/operators/teradata.py @@ -17,11 +17,15 @@ # under the License. from __future__ import annotations -from typing import Sequence +from typing import TYPE_CHECKING, Sequence +from airflow.models import BaseOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.teradata.hooks.teradata import TeradataHook +if TYPE_CHECKING: + from airflow.utils.context import Context + class TeradataOperator(SQLExecuteQueryOperator): """ @@ -34,15 +38,15 @@ class TeradataOperator(SQLExecuteQueryOperator): :ref:`howto/operator:TeradataOperator` :param sql: the SQL query to be executed as a single string, or a list of str (sql statements) - :param conn_id: reference to a predefined database + :param teradata_conn_id: reference to a predefined database :param autocommit: if True, each command is automatically committed.(default value: False) :param parameters: (optional) the parameters to render the SQL query with. :param schema: The Teradata database to connect to. """ template_fields: Sequence[str] = ( - "parameters", "sql", + "parameters", ) template_ext: Sequence[str] = (".sql",) template_fields_renderers = {"sql": "sql"} @@ -50,7 +54,7 @@ class TeradataOperator(SQLExecuteQueryOperator): def __init__( self, - conn_id: str = TeradataHook.default_conn_name, + teradata_conn_id: str = TeradataHook.default_conn_name, schema: str | None = None, **kwargs, ) -> None: @@ -61,4 +65,39 @@ def __init__( **hook_params, } super().__init__(**kwargs) - self.conn_id = conn_id + self.conn_id = teradata_conn_id + + +class TeradataStoredProcedureOperator(BaseOperator): + """ + Executes stored procedure in a specific Teradata database. + + :param procedure: name of stored procedure to call (templated) + :param teradata_conn_id: The :ref:`Teradata connection id ` + reference to a specific Teradata database. + :param parameters: (optional, templated) the parameters provided in the call + + """ + + template_fields: Sequence[str] = ( + "procedure", + "parameters", + ) + ui_color = "#ededed" + + def __init__( + self, + *, + procedure: str, + teradata_conn_id: str = TeradataHook.default_conn_name, + parameters: dict | list | None = None, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.teradata_conn_id = teradata_conn_id + self.procedure = procedure + self.parameters = parameters + + def execute(self, context: Context): + hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + return hook.callproc(self.procedure, autocommit=True, parameters=self.parameters) diff --git a/airflow/providers/teradata/provider.yaml b/airflow/providers/teradata/provider.yaml index 657a66aa7cea9..ea57c384ec90a 100644 --- a/airflow/providers/teradata/provider.yaml +++ b/airflow/providers/teradata/provider.yaml @@ -35,6 +35,14 @@ dependencies: - teradatasqlalchemy>=17.20.0.0 - teradatasql>=17.20.0.28 +additional-extras: + - name: microsoft.azure + dependencies: + - apache-airflow-providers-microsoft-azure + - name: amazon + dependencies: + - apache-airflow-providers-amazon + integrations: - integration-name: Teradata external-doc-url: https://www.teradata.com/ @@ -58,6 +66,14 @@ transfers: target-integration-name: Teradata python-module: airflow.providers.teradata.transfers.teradata_to_teradata how-to-guide: /docs/apache-airflow-providers-teradata/operators/teradata_to_teradata.rst + - source-integration-name: Microsoft Azure Blob Storage + target-integration-name: Teradata + python-module: airflow.providers.teradata.transfers.azure_blob_to_teradata + how-to-guide: /docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst + - source-integration-name: Amazon Simple Storage Service (S3) + target-integration-name: Teradata + python-module: airflow.providers.teradata.transfers.s3_to_teradata + how-to-guide: /docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst connection-types: - hook-class-name: airflow.providers.teradata.hooks.teradata.TeradataHook diff --git a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py new file mode 100644 index 0000000000000..416b4e7136cb0 --- /dev/null +++ b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from textwrap import dedent +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator + +try: + from airflow.providers.microsoft.azure.hooks.wasb import WasbHook +except ModuleNotFoundError as e: + from airflow.exceptions import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException(e) + +from airflow.providers.teradata.hooks.teradata import TeradataHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AzureBlobStorageToTeradataOperator(BaseOperator): + """ + + Loads CSV, JSON and Parquet format data from Azure Blob Storage to Teradata. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AzureBlobStorageToTeradataOperator` + + :param blob_source_key: The URI format specifying the location of the Azure blob object store.(templated) + The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`. + Refer to + https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US + :param azure_conn_id: The Airflow WASB connection used for azure blob credentials. + :param teradata_table: The name of the teradata table to which the data is transferred.(templated) + :param teradata_conn_id: The connection ID used to connect to Teradata + :ref:`Teradata connection ` + + Note that ``blob_source_key`` and ``teradata_table`` are + templated, so you can use variables in them if you wish. + """ + + template_fields: Sequence[str] = ("blob_source_key", "teradata_table") + ui_color = "#e07c24" + + def __init__( + self, + *, + blob_source_key: str, + azure_conn_id: str = "azure_default", + teradata_table: str, + teradata_conn_id: str = "teradata_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.blob_source_key = blob_source_key + self.azure_conn_id = azure_conn_id + self.teradata_table = teradata_table + self.teradata_conn_id = teradata_conn_id + + def execute(self, context: Context) -> None: + self.log.info( + "transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table + ) + azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id) + conn = azure_hook.get_connection(self.azure_conn_id) + # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container + access_id = conn.login if conn.login is not None else "" + access_secret = conn.password if conn.password is not None else "" + teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + sql = dedent(f""" + CREATE MULTISET TABLE {self.teradata_table} AS + ( + SELECT * FROM ( + LOCATION = '{self.blob_source_key}' + ACCESS_ID= '{access_id}' + ACCESS_KEY= '{access_secret}' + ) AS d + ) WITH DATA + """).rstrip() + try: + teradata_hook.run(sql, True) + except Exception as ex: + self.log.error(str(ex)) + raise + self.log.info("The transfer of data from Azure Blob to Teradata was successful") diff --git a/airflow/providers/teradata/transfers/s3_to_teradata.py b/airflow/providers/teradata/transfers/s3_to_teradata.py new file mode 100644 index 0000000000000..f7998ea861135 --- /dev/null +++ b/airflow/providers/teradata/transfers/s3_to_teradata.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from textwrap import dedent +from typing import TYPE_CHECKING, Sequence + +from airflow.models import BaseOperator + +try: + from airflow.providers.amazon.aws.hooks.s3 import S3Hook +except ModuleNotFoundError as e: + from airflow.exceptions import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException(e) +from airflow.providers.teradata.hooks.teradata import TeradataHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class S3ToTeradataOperator(BaseOperator): + """ + Loads CSV, JSON and Parquet format data from Amazon S3 to Teradata. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:S3ToTeradataOperator` + + :param s3_source_key: The URI format specifying the location of the S3 bucket.(templated) + The URI format is /s3/YOUR-BUCKET.s3.amazonaws.com/YOUR-BUCKET-NAME. + Refer to + https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US + :param public_bucket: Specifies whether the provided S3 bucket is public. If the bucket is public, + it means that anyone can access the objects within it via a URL without requiring authentication. + If the bucket is private and authentication is not provided, the operator will throw an exception. + :param teradata_table: The name of the teradata table to which the data is transferred.(templated) + :param aws_conn_id: The Airflow AWS connection used for AWS credentials. + :param teradata_conn_id: The connection ID used to connect to Teradata + :ref:`Teradata connection `. + + Note that ``s3_source_key`` and ``teradata_table`` are + templated, so you can use variables in them if you wish. + """ + + template_fields: Sequence[str] = ("s3_source_key", "teradata_table") + ui_color = "#e07c24" + + def __init__( + self, + *, + s3_source_key: str, + public_bucket: bool = False, + teradata_table: str, + aws_conn_id: str = "aws_default", + teradata_conn_id: str = "teradata_default", + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.s3_source_key = s3_source_key + self.public_bucket = public_bucket + self.teradata_table = teradata_table + self.aws_conn_id = aws_conn_id + self.teradata_conn_id = teradata_conn_id + + def execute(self, context: Context) -> None: + self.log.info( + "transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table + ) + + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) + access_key = "" + access_secret = "" + if not self.public_bucket: + credentials = s3_hook.get_credentials() + access_key = credentials.access_key + access_secret = credentials.secret_key + teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + sql = dedent(f""" + CREATE MULTISET TABLE {self.teradata_table} AS + ( + SELECT * FROM ( + LOCATION = '{self.s3_source_key}' + ACCESS_ID= '{access_key}' + ACCESS_KEY= '{access_secret}' + ) AS d + ) WITH DATA + """).rstrip() + try: + teradata_hook.run(sql, True) + except Exception as ex: + self.log.error(str(ex)) + raise + self.log.info("The transfer of data from S3 to Teradata was successful") diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 81789de714ab2..f7daed619a1fc 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -503,7 +503,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): { "affected-providers-list-as-string": "amazon apache.hive cncf.kubernetes " "common.sql exasol ftp google http imap microsoft.azure " - "mongo mysql openlineage postgres salesforce ssh", + "mongo mysql openlineage postgres salesforce ssh teradata", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", "python-versions": "['3.8']", @@ -519,7 +519,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-amazon-tests": "true", "parallel-test-types-list-as-string": "Always Providers[amazon] " "Providers[apache.hive,cncf.kubernetes,common.sql,exasol,ftp,http," - "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh] Providers[google]", + "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", "needs-mypy": "true", "mypy-folders": "['providers']", }, @@ -553,7 +553,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): { "affected-providers-list-as-string": "amazon apache.hive cncf.kubernetes " "common.sql exasol ftp google http imap microsoft.azure " - "mongo mysql openlineage postgres salesforce ssh", + "mongo mysql openlineage postgres salesforce ssh teradata", "all-python-versions": "['3.8']", "all-python-versions-list-as-string": "3.8", "python-versions": "['3.8']", @@ -569,7 +569,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "upgrade-to-newer-dependencies": "false", "parallel-test-types-list-as-string": "Always Providers[amazon] " "Providers[apache.hive,cncf.kubernetes,common.sql,exasol,ftp,http," - "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh] Providers[google]", + "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", "needs-mypy": "true", "mypy-folders": "['providers']", }, diff --git a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst new file mode 100644 index 0000000000000..0ee9a7bb32595 --- /dev/null +++ b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst @@ -0,0 +1,73 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _howto/operator:AzureBlobStorageToTeradataOperator: + + +================================== +AzureBlobStorageToTeradataOperator +================================== + +The purpose of ``AzureBlobStorageToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet +format data transfer from an Azure Blob Storage to Teradata table. +Use the :class:`AzureBlobStorageToTeradataOperator ` +to transfer data from an Azure Blob Storage to Teradata. + + +Transferring data in CSV format from Azure Blob Storage to Teradata +------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from Azure Blob Storage +to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + +Transferring data in JSON format from Azure Blob Storage to Teradata +-------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer JSON data format from Azure Blob Storage +to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + +Transferring data in PARQUET format from Azure Blob Storage to Teradata +----------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer PARQUET data format from Azure Blob Storage +to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + +The complete ``AzureBlobStorageToTeradataOperator`` Operator DAG +---------------------------------------------------------------- + +When we put everything together, our DAG should look like this: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide] diff --git a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst new file mode 100644 index 0000000000000..a6ecbc6f146b8 --- /dev/null +++ b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst @@ -0,0 +1,77 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _howto/operator:S3ToTeradataOperator: + + +============================ +S3ToTeradataOperator +============================ + +The purpose of ``S3ToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet +format data transfer from an AWS Simple Storage Service (S3) to Teradata table. This operator uses +Teradata READ_NOS feature to transfer data from an AWS Simple Storage Service (S3) to Teradata table. +READ_NOS is a table operator in Teradata Vantage that allows users to list external files at a specified location. +For more details, see `READ_NOS Functionality `_ + +Use the :class:`S3ToTeradataOperator ` +to transfer data from S3 to Teradata. + + .. note:: + The current version of ``S3ToTeradataOperator`` does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials. + + +Transferring data in CSV format from S3 to Teradata +--------------------------------------------------- + +An example usage of the S3ToTeradataOperator to transfer CSV data format from S3 to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + +Transferring data in JSON format from S3 to Teradata +---------------------------------------------------- + +An example usage of the S3ToTeradataOperator to transfer JSON data format from S3 to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + +Transferring data in PARQUET format from S3 to Teradata +------------------------------------------------------- + +An example usage of the S3ToTeradataOperator to transfer PARQUET data format from S3 to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + +The complete ``S3ToTeradataOperator`` Operator DAG +-------------------------------------------------- + +When we put everything together, our DAG should look like this: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py + :language: python + :start-after: [START s3_to_teradata_transfer_operator_howto_guide] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide] diff --git a/docs/apache-airflow-providers-teradata/operators/teradata.rst b/docs/apache-airflow-providers-teradata/operators/teradata.rst index e894d37ade66c..6fd7d371a7b09 100644 --- a/docs/apache-airflow-providers-teradata/operators/teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/teradata.rst @@ -113,3 +113,117 @@ When we put everything together, our DAG should look like this: :language: python :start-after: [START teradata_operator_howto_guide] :end-before: [END teradata_operator_howto_guide] + +TeradataStoredProcedureOperator +=============================== + +The purpose of TeradataStoredProcedureOperator is to define tasks involving executing teradata +stored procedures. + +Execute a Stored Procedure in a Teradata database +------------------------------------------------- + +To execute a Stored Procedure in an Teradata, use the +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator`. + +Assume a stored procedure exists in the database that looks like this: + + .. code-block:: sql + + REPLACE PROCEDURE TEST_PROCEDURE ( + IN val_in INTEGER, + INOUT val_in_out INTEGER, + OUT val_out INTEGER, + OUT value_str_out varchar(100) + ) + BEGIN + set val_out = val_in * 2; + set val_in_out = val_in_out * 4; + set value_str_out = 'string output'; + END; + / + +This stored procedure takes an integer argument, val_in, as input. +It operates with a single inout argument, val_in_out, which serves as both input and output. +Additionally, it returns an integer argument, val_out, and a string argument, value_str_out. + +This stored procedure can be invoked using +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` in various manners. + +One approach involves passing parameters positionally as a list, with output parameters specified as Python data types: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_call_teradata_stored_procedure_operator_with_types] + :end-before: [END howto_call_teradata_stored_procedure_operator_with_types] + +Alternatively, parameters can be passed positionally as a list, with output parameters designated as placeholders: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_call_teradata_stored_procedure_operator_with_place_holder] + :end-before: [END howto_call_teradata_stored_procedure_operator_with_place_holder] + +Another method entails passing parameters positionally as a dictionary: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_call_teradata_stored_procedure_operator_with_dict_input] + :end-before: [END howto_call_teradata_stored_procedure_operator_with_dict_input] + +Assume a stored procedure exists in the database that looks like this: + + .. code-block:: sql + + REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) + BEGIN + -- Assign current timestamp to the OUT parameter + SET out_timestamp = CURRENT_TIMESTAMP; + END; + / + +This stored procedure yields a singular timestamp argument, out_timestamp, and is callable through +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` +with parameters passed positionally as a list: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_call_teradata_stored_procedure_operator_timestamp] + :end-before: [END howto_call_teradata_stored_procedure_operator_timestamp] + + +Assume a stored procedure exists in the database that looks like this: + + .. code-block:: sql + + REPLACE PROCEDURE + TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) + BEGIN + DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ; + DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ; + open cur1 ; + open cur2 ; + set val_out = val_in * 2; + END; + / + +This stored procedure takes a single integer argument, val_in, as input and produces a single integer argument, val_out. +Additionally, it yields two cursors representing the outputs of select queries. +This stored procedure can be invoked using +:class:`~airflow.providers.teradata.operators.teradata.TeradataStoredProcedureOperator` +with parameters passed positionally as a list: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + :end-before: [END howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + +The complete TeradataStoredProcedureOperator DAG +------------------------------------------------ + +When we put everything together, our DAG should look like this: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_call_sp.py + :language: python + :start-after: [START howto_teradata_operator_for_sp] + :end-before: [END howto_teradata_operator_for_sp] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 06400ac0c3bd4..8e4b964535537 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -806,6 +806,7 @@ initcontainers initdb initialisation initialising +inout InsecureClient InspectContentResponse InspectTemplate diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index b577aeb7c8819..7f29984d329b5 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1244,7 +1244,9 @@ "devel-deps": [], "plugins": [], "cross-providers-deps": [ - "common.sql" + "amazon", + "common.sql", + "microsoft.azure" ], "excluded-python-versions": [], "state": "ready" diff --git a/tests/providers/teradata/hooks/test_teradata.py b/tests/providers/teradata/hooks/test_teradata.py index af77d66a636fe..689ab871d9f81 100644 --- a/tests/providers/teradata/hooks/test_teradata.py +++ b/tests/providers/teradata/hooks/test_teradata.py @@ -264,3 +264,14 @@ def test_bulk_insert_rows_no_rows(self): rows = [] with pytest.raises(ValueError): self.test_db_hook.bulk_insert_rows("table", rows) + + def test_callproc_dict(self): + parameters = {"a": 1, "b": 2, "c": 3} + + class bindvar(int): + def getvalue(self): + return self + + self.cur.fetchall.return_value = {k: bindvar(v) for k, v in parameters.items()} + result = self.test_db_hook.callproc("proc", True, parameters) + assert result == parameters diff --git a/tests/providers/teradata/operators/test_teradata.py b/tests/providers/teradata/operators/test_teradata.py index 3d491a66758b4..0c614b42b262d 100644 --- a/tests/providers/teradata/operators/test_teradata.py +++ b/tests/providers/teradata/operators/test_teradata.py @@ -23,7 +23,7 @@ from airflow.exceptions import AirflowException from airflow.providers.common.sql.hooks.sql import fetch_all_handler from airflow.providers.teradata.hooks.teradata import TeradataHook -from airflow.providers.teradata.operators.teradata import TeradataOperator +from airflow.providers.teradata.operators.teradata import TeradataOperator, TeradataStoredProcedureOperator class TestTeradataOperator: @@ -65,7 +65,9 @@ def test_execute(self, mock_get_db_hook): context = "test_context" task_id = "test_task_id" - operator = TeradataOperator(sql=sql, conn_id=teradata_conn_id, parameters=parameters, task_id=task_id) + operator = TeradataOperator( + sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id + ) operator.execute(context=context) mock_get_db_hook.return_value.run.assert_called_once_with( sql=sql, @@ -88,7 +90,9 @@ def test_teradata_operator_test_multi(self, mock_get_db_hook): context = "test_context" task_id = "test_task_id" - operator = TeradataOperator(sql=sql, conn_id=teradata_conn_id, parameters=parameters, task_id=task_id) + operator = TeradataOperator( + sql=sql, teradata_conn_id=teradata_conn_id, parameters=parameters, task_id=task_id + ) operator.execute(context=context) mock_get_db_hook.return_value.run.assert_called_once_with( sql=sql, @@ -97,3 +101,29 @@ def test_teradata_operator_test_multi(self, mock_get_db_hook): handler=fetch_all_handler, return_last=True, ) + + +class TestTeradataStoredProcedureOperator: + @mock.patch.object(TeradataHook, "run", autospec=TeradataHook.run) + def test_execute(self, mock_run): + procedure = "test" + teradata_conn_id = "teradata_default" + parameters = {"parameter": "value"} + context = "test_context" + task_id = "test_task_id" + + operator = TeradataStoredProcedureOperator( + procedure=procedure, + teradata_conn_id=teradata_conn_id, + parameters=parameters, + task_id=task_id, + ) + result = operator.execute(context=context) + assert result is mock_run.return_value + mock_run.assert_called_once_with( + mock.ANY, + "{CALL test(?)}", + autocommit=True, + parameters=["value"], + handler=mock.ANY, + ) diff --git a/tests/providers/teradata/transfers/test_azure_blob_to_teradata.py b/tests/providers/teradata/transfers/test_azure_blob_to_teradata.py new file mode 100644 index 0000000000000..a9f0fd7f46e02 --- /dev/null +++ b/tests/providers/teradata/transfers/test_azure_blob_to_teradata.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +from airflow.providers.teradata.transfers.azure_blob_to_teradata import AzureBlobStorageToTeradataOperator + +AZURE_CONN_ID = "wasb_default" +TERADATA_CONN_ID = "teradata_default" +BLOB_SOURCE_KEY = "az/test" +TERADATA_TABLE = "test" +TASK_ID = "transfer_file" + + +class TestAzureBlobStorageToTeradataOperator: + def test_init(self): + operator = AzureBlobStorageToTeradataOperator( + azure_conn_id=AZURE_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + teradata_table=TERADATA_TABLE, + blob_source_key=BLOB_SOURCE_KEY, + task_id=TASK_ID, + ) + + assert operator.azure_conn_id == AZURE_CONN_ID + assert operator.blob_source_key == BLOB_SOURCE_KEY + assert operator.teradata_conn_id == TERADATA_CONN_ID + assert operator.teradata_table == TERADATA_TABLE + assert operator.task_id == TASK_ID + + @mock.patch("airflow.providers.teradata.transfers.azure_blob_to_teradata.TeradataHook") + @mock.patch("airflow.providers.teradata.transfers.azure_blob_to_teradata.WasbHook") + def test_execute(self, mock_hook_wasb, mock_hook_teradata): + op = AzureBlobStorageToTeradataOperator( + azure_conn_id=AZURE_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + teradata_table=TERADATA_TABLE, + blob_source_key=BLOB_SOURCE_KEY, + task_id=TASK_ID, + ) + op.execute(context=None) + mock_hook_wasb.assert_called_once_with(wasb_conn_id=AZURE_CONN_ID) + mock_hook_teradata.assert_called_once_with(teradata_conn_id=TERADATA_CONN_ID) + sql = "SQL" + mock_hook_teradata.run(sql) diff --git a/tests/providers/teradata/transfers/test_s3_to_teradata.py b/tests/providers/teradata/transfers/test_s3_to_teradata.py new file mode 100644 index 0000000000000..f88cacfb084f8 --- /dev/null +++ b/tests/providers/teradata/transfers/test_s3_to_teradata.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime +from unittest import mock + +from boto3.session import Session + +from airflow.models.connection import Connection +from airflow.providers.teradata.transfers.s3_to_teradata import S3ToTeradataOperator + +DEFAULT_DATE = datetime(2024, 1, 1) + +AWS_CONN_ID = "aws_default" +TERADATA_CONN_ID = "teradata_default" +S3_SOURCE_KEY = "aws/test" +TERADATA_TABLE = "test" +TASK_ID = "transfer_file" + + +class TestS3ToTeradataTransfer: + def test_init(self): + operator = S3ToTeradataOperator( + s3_source_key=S3_SOURCE_KEY, + teradata_table=TERADATA_TABLE, + aws_conn_id=AWS_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + task_id=TASK_ID, + dag=None, + ) + + assert operator.aws_conn_id == AWS_CONN_ID + assert operator.s3_source_key == S3_SOURCE_KEY + assert operator.teradata_conn_id == TERADATA_CONN_ID + assert operator.teradata_table == TERADATA_TABLE + assert operator.task_id == TASK_ID + + @mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_connection") + @mock.patch("airflow.models.connection.Connection") + @mock.patch("boto3.session.Session") + @mock.patch("airflow.providers.teradata.hooks.teradata.TeradataHook.run") + def test_execute(self, mock_run, mock_session, mock_connection, mock_hook): + access_key = "aws_access_key_id" + access_secret = "aws_secret_access_key" + mock_session.return_value = Session(access_key, access_secret) + mock_session.return_value.access_key = access_key + mock_session.return_value.secret_key = access_secret + mock_session.return_value.token = None + + mock_connection.return_value = Connection() + mock_hook.return_value = Connection() + + op = S3ToTeradataOperator( + s3_source_key=S3_SOURCE_KEY, + teradata_table=TERADATA_TABLE, + aws_conn_id=AWS_CONN_ID, + teradata_conn_id=TERADATA_CONN_ID, + task_id=TASK_ID, + dag=None, + ) + op.execute(None) + + assert mock_run.call_count == 1 diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py new file mode 100644 index 0000000000000..5d961550de59d --- /dev/null +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG to show usage of AzureBlobStorageToTeradataOperator + +The transfer operator moves files in CSV, JSON, and PARQUET formats from Azure Blob storage +to Teradata tables. In the example Directed Acyclic Graph (DAG) below, it assumes Airflow +Connections with the IDs `teradata_default` and `wasb_default` already exist. The DAG creates +tables using data from the Azure Blob location, reports the number of rows inserted into +the table, and subsequently drops the table. +""" + +from __future__ import annotations + +import datetime +import os + +import pytest + +from airflow import DAG + +try: + from airflow.providers.teradata.operators.teradata import TeradataOperator + from airflow.providers.teradata.transfers.azure_blob_to_teradata import AzureBlobStorageToTeradataOperator +except ImportError: + pytest.skip("Teradata provider apache-airflow-provider-teradata not available", allow_module_level=True) + +# [START azure_blob_to_teradata_transfer_operator_howto_guide] + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_azure_blob_to_teradata_transfer_operator" +CONN_ID = "teradata_default" + +with DAG( + dag_id=DAG_ID, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, + default_args={"teradata_conn_id": CONN_ID}, +) as dag: + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + transfer_data_csv = AzureBlobStorageToTeradataOperator( + task_id="transfer_data_blob_to_teradata_csv", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/", + teradata_table="example_blob_teradata_csv", + azure_conn_id="wasb_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_data_table_csv = TeradataOperator( + task_id="read_data_table_csv", + sql="SELECT count(1) from example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_table_csv = TeradataOperator( + task_id="drop_table_csv", + sql="DROP TABLE example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + transfer_data_json = AzureBlobStorageToTeradataOperator( + task_id="transfer_data_blob_to_teradata_json", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/", + teradata_table="example_blob_teradata_json", + azure_conn_id="wasb_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json] + read_data_table_json = TeradataOperator( + task_id="read_data_table_json", + sql="SELECT count(1) from example_blob_teradata_json;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_json] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] + drop_table_json = TeradataOperator( + task_id="drop_table_json", + sql="DROP TABLE example_blob_teradata_json;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_json] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + transfer_data_parquet = AzureBlobStorageToTeradataOperator( + task_id="transfer_data_blob_to_teradata_parquet", + blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/", + teradata_table="example_blob_teradata_parquet", + azure_conn_id="wasb_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_parquet] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + read_data_table_parquet = TeradataOperator( + task_id="read_data_table_parquet", + sql="SELECT count(1) from example_blob_teradata_parquet;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + drop_table_parquet = TeradataOperator( + task_id="drop_table_parquet", + sql="DROP TABLE example_blob_teradata_parquet;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + + ( + transfer_data_csv + >> transfer_data_json + >> transfer_data_parquet + >> read_data_table_csv + >> read_data_table_json + >> read_data_table_parquet + >> drop_table_csv + >> drop_table_json + >> drop_table_parquet + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py new file mode 100644 index 0000000000000..fc5e2627393f8 --- /dev/null +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -0,0 +1,147 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG to show usage of S3StorageToTeradataOperator. + +The transfer operator moves files in CSV, JSON, and PARQUET formats from S3 +to Teradata tables. In the example Directed Acyclic Graph (DAG) below, it assumes Airflow +Connections with the IDs `teradata_default` and `aws_default` already exist. The DAG creates +tables using data from the S3, reports the number of rows inserted into +the table, and subsequently drops the table. +""" + +from __future__ import annotations + +import datetime +import os + +import pytest + +from airflow import DAG +from airflow.providers.teradata.transfers.s3_to_teradata import S3ToTeradataOperator + +try: + from airflow.providers.teradata.operators.teradata import TeradataOperator +except ImportError: + pytest.skip("Teradata provider apache-airflow-provider-teradata not available", allow_module_level=True) + +# [START s3_to_teradata_transfer_operator_howto_guide] + + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "example_s3_to_teradata_transfer_operator" +CONN_ID = "teradata_default" + +with DAG( + dag_id=DAG_ID, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, + default_args={"teradata_conn_id": CONN_ID}, +) as dag: + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + transfer_data_csv = S3ToTeradataOperator( + task_id="transfer_data_s3_to_teradata_csv", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/", + public_bucket=True, + teradata_table="example_s3_teradata_csv", + aws_conn_id="aws_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_data_table_csv = TeradataOperator( + task_id="read_data_table_csv", + conn_id=CONN_ID, + sql="SELECT * from example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_table_csv = TeradataOperator( + task_id="drop_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + transfer_data_json = S3ToTeradataOperator( + task_id="transfer_data_s3_to_teradata_json", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/", + public_bucket=True, + teradata_table="example_s3_teradata_json", + aws_conn_id="aws_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json] + read_data_table_json = TeradataOperator( + task_id="read_data_table_json", + sql="SELECT * from example_s3_teradata_json;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json] + drop_table_json = TeradataOperator( + task_id="drop_table_json", + sql="DROP TABLE example_s3_teradata_json;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + transfer_data_parquet = S3ToTeradataOperator( + task_id="transfer_data_s3_to_teradata_parquet", + s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/", + public_bucket=True, + teradata_table="example_s3_teradata_parquet", + aws_conn_id="aws_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + read_data_table_parquet = TeradataOperator( + task_id="read_data_table_parquet", + sql="SELECT * from example_s3_teradata_parquet;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + drop_table_parquet = TeradataOperator( + task_id="drop_table_parquet", + sql="DROP TABLE example_s3_teradata_parquet;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + ( + transfer_data_csv + >> transfer_data_json + >> transfer_data_parquet + >> read_data_table_csv + >> read_data_table_json + >> read_data_table_parquet + >> drop_table_csv + >> drop_table_json + >> drop_table_parquet + ) + # [END s3_to_teradata_transfer_operator_howto_guide] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/teradata/example_ssl_teradata.py b/tests/system/providers/teradata/example_ssl_teradata.py index 600f1d6b89685..1673bd791a79b 100644 --- a/tests/system/providers/teradata/example_ssl_teradata.py +++ b/tests/system/providers/teradata/example_ssl_teradata.py @@ -16,7 +16,11 @@ # specific language governing permissions and limitations # under the License. """ -Example use of Teradata related operators. +Example Airflow DAG to show usage of TeradataOperator with SSL teradata connection. + +This DAG assumes Airflow Connection with connection id `teradata_ssl_default` already exists in locally. It +shows how to use create, update, delete and select teradata statements with TeradataOperator as tasks in +airflow dags using TeradataStoredProcedureOperator. """ from __future__ import annotations @@ -42,14 +46,15 @@ # the Teradata Operator ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") -DAG_ID = "example_teradata" +DAG_ID = "example_ssl_teradata" +CONN_ID = "teradata_ssl_default" with DAG( dag_id=DAG_ID, start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_ssl_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START teradata_operator_howto_guide_create_country_table] create_country_table = TeradataOperator( @@ -85,23 +90,23 @@ # [START teradata_operator_howto_guide_get_all_countries] get_all_countries = TeradataOperator( task_id="get_all_countries", - sql=r"""SELECT * FROM SSL_Country;""", + sql=r"SELECT * FROM SSL_Country;", ) # [END teradata_operator_howto_guide_get_all_countries] # [START teradata_operator_howto_guide_params_passing_get_query] get_countries_from_continent = TeradataOperator( task_id="get_countries_from_continent", - sql=r"""SELECT * FROM SSL_Country where {{ params.column }}='{{ params.value }}';""", + sql=r"SELECT * FROM SSL_Country where {{ params.column }}='{{ params.value }}';", params={"column": "continent", "value": "Asia"}, ) # [END teradata_operator_howto_guide_params_passing_get_query] # [START teradata_operator_howto_guide_drop_country_table] drop_country_table = TeradataOperator( - task_id="drop_country_table", sql=r"""DROP TABLE SSL_Country;""", dag=dag + task_id="drop_country_table", sql=r"DROP TABLE SSL_Country;", dag=dag ) # [END teradata_operator_howto_guide_drop_country_table] # [START teradata_operator_howto_guide_drop_users_table] - drop_users_table = TeradataOperator(task_id="drop_users_table", sql=r"""DROP TABLE SSL_Users;""", dag=dag) + drop_users_table = TeradataOperator(task_id="drop_users_table", sql=r"DROP TABLE SSL_Users;", dag=dag) # [END teradata_operator_howto_guide_drop_users_table] ( diff --git a/tests/system/providers/teradata/example_teradata.py b/tests/system/providers/teradata/example_teradata.py index 2e470a877a0da..1fd587cdf8f71 100644 --- a/tests/system/providers/teradata/example_teradata.py +++ b/tests/system/providers/teradata/example_teradata.py @@ -16,7 +16,11 @@ # specific language governing permissions and limitations # under the License. """ -Example use of Teradata related operators. +Example Airflow DAG to show usage of TeradataOperator. + +This DAG assumes Airflow Connection with connection id `teradata_default` already exists in locally. It +shows how to use create, update, delete and select teradata statements with TeradataOperator as tasks in +airflow dags using TeradataStoredProcedureOperator. """ from __future__ import annotations @@ -38,13 +42,14 @@ ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_teradata" +CONN_ID = "teradata_default" with DAG( dag_id=DAG_ID, start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START teradata_operator_howto_guide_create_table] create_table = TeradataOperator( @@ -84,44 +89,34 @@ # [START teradata_operator_howto_guide_get_all_countries] get_all_countries = TeradataOperator( task_id="get_all_countries", - sql=r""" - SELECT * FROM Country; - """, + sql=r"SELECT * FROM Country;", ) # [END teradata_operator_howto_guide_get_all_countries] # [START teradata_operator_howto_guide_params_passing_get_query] get_countries_from_continent = TeradataOperator( task_id="get_countries_from_continent", - sql=r""" - SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}'; - """, + sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';", params={"column": "continent", "value": "Asia"}, ) # [END teradata_operator_howto_guide_params_passing_get_query] # [START teradata_operator_howto_guide_drop_country_table] drop_country_table = TeradataOperator( task_id="drop_country_table", - sql=r""" - DROP TABLE Country; - """, + sql=r"DROP TABLE Country;", dag=dag, ) # [END teradata_operator_howto_guide_drop_country_table] # [START teradata_operator_howto_guide_drop_users_table] drop_users_table = TeradataOperator( task_id="drop_users_table", - sql=r""" - DROP TABLE Users; - """, + sql=r"DROP TABLE Users;", dag=dag, ) # [END teradata_operator_howto_guide_drop_users_table] # [START teradata_operator_howto_guide_create_schema] create_schema = TeradataOperator( task_id="create_schema", - sql=r""" - CREATE DATABASE airflow_temp AS PERM=10e6; - """, + sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;", ) # [END teradata_operator_howto_guide_create_schema] # [START teradata_operator_howto_guide_create_table_with_schema] @@ -140,9 +135,7 @@ # [START teradata_operator_howto_guide_drop_schema_table] drop_schema_table = TeradataOperator( task_id="drop_schema_table", - sql=r""" - DROP TABLE schema_table; - """, + sql=r"DROP TABLE schema_table;", dag=dag, schema="airflow_temp", ) @@ -150,9 +143,7 @@ # [START teradata_operator_howto_guide_drop_schema] drop_schema = TeradataOperator( task_id="drop_schema", - sql=r""" - DROP DATABASE airflow_temp; - """, + sql=r"DROP DATABASE airflow_temp;", dag=dag, ) diff --git a/tests/system/providers/teradata/example_teradata_call_sp.py b/tests/system/providers/teradata/example_teradata_call_sp.py new file mode 100644 index 0000000000000..98ce85fffdfdc --- /dev/null +++ b/tests/system/providers/teradata/example_teradata_call_sp.py @@ -0,0 +1,174 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example Airflow DAG to show Stored Procedure creation and execution on teradata database using +TeradataStoredProcedureOperator. + +This DAG assumes Airflow Connection with connection id `teradata_sp_call` already exists in locally. It +shows how to create and execute Stored Procedure as tasks in airflow dags using +TeradataStoredProcedureOperator.""" + +from __future__ import annotations + +from datetime import datetime + +import pytest + +from airflow import DAG + +try: + from airflow.providers.teradata.operators.teradata import ( + TeradataOperator, + TeradataStoredProcedureOperator, + ) +except ImportError: + pytest.skip("Teradata provider apache-airflow-provider-teradata not available", allow_module_level=True) + +# [START howto_teradata_operator_for_sp] +CONN_ID = "teradata_sp_call" +DAG_ID = "example_teradata_call_sp" + +with DAG( + dag_id=DAG_ID, + max_active_runs=1, + max_active_tasks=3, + catchup=False, + default_args={"teradata_conn_id": CONN_ID}, + schedule="@once", + start_date=datetime(2023, 1, 1), +) as dag: + # [START howto_teradata_stored_procedure_operator_with_in_inout_out] + # [START howto_create_teradata_stored_procedure_operator_with_in_inout] + create_sp_in_inout = TeradataOperator( + task_id="create_sp_in_inout", + sql=r"""REPLACE PROCEDURE TEST_PROCEDURE ( + IN val_in INTEGER, + INOUT val_in_out INTEGER, + OUT val_out INTEGER, + OUT value_str_out varchar(100) + ) + BEGIN + set val_out = val_in * 2; + set val_in_out = val_in_out * 4; + set value_str_out = 'string output'; + END; + """, + ) + # [END howto_create_teradata_stored_procedure_operator_with_in_inout] + # [START howto_call_teradata_stored_procedure_operator_with_types] + opr_sp_types = TeradataStoredProcedureOperator( + task_id="opr_sp_types", + procedure="TEST_PROCEDURE", + parameters=[3, 1, int, str], + ) + # [END howto_call_teradata_stored_procedure_operator_with_types] + # [START howto_call_teradata_stored_procedure_operator_with_place_holder] + opr_sp_place_holder = TeradataStoredProcedureOperator( + task_id="opr_sp_place_holder", + procedure="TEST_PROCEDURE", + parameters=[3, 1, "?", "?"], + ) + # [END howto_call_teradata_stored_procedure_operator_with_place_holder] + # [START howto_call_teradata_stored_procedure_operator_with_dict_input] + opr_sp_dict = TeradataStoredProcedureOperator( + task_id="opr_sp_dict", + procedure="TEST_PROCEDURE", + parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str}, + ) + # [END howto_call_teradata_stored_procedure_operator_with_dict_input] + # [END howto_teradata_stored_procedure_operator_with_in_inout_out] + # [START howto_create_teradata_stored_procedure_operator_timestamp] + create_sp_timestamp = TeradataOperator( + task_id="create_sp_timestamp", + sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) + BEGIN + -- Assign current timestamp to the OUT parameter + SET out_timestamp = CURRENT_TIMESTAMP; + END; + """, + ) + # [END howto_create_teradata_stored_procedure_operator_timestamp] + # [START howto_call_teradata_stored_procedure_operator_timestamp] + opr_sp_timestamp = TeradataStoredProcedureOperator( + task_id="opr_sp_timestamp", + procedure="GetTimestampOutParameter", + parameters=["?"], + ) + # [END howto_call_teradata_stored_procedure_operator_timestamp] + # [START howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + create_sp_param_dr = TeradataOperator( + task_id="create_sp_param_dr", + sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer) + dynamic result sets 2 + begin + declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ; + declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ; + open cur1 ; + open cur2 ; + set p2 = p1 + p2 ; + set p3 = p1 * p2 ; + end ; + """, + ) + # [END howto_teradata_stored_procedure_operator_with_in_out_dynamic_result] + # [START howto_call_teradata_stored_procedure_operator_with_in_out_dynamic_result] + opr_sp_param_dr = TeradataStoredProcedureOperator( + task_id="opr_sp_param_dr", + procedure="examplestoredproc", + parameters=[3, 2, int], + ) + # [END howto_call_teradata_stored_procedure_operator_with_in_out_dynamic_result] + # [START howto_teradata_stored_procedure_operator_drop] + drop_sp = TeradataOperator( + task_id="drop_sp", + sql=r"drop procedure examplestoredproc;", + ) + drop_sp_test = TeradataOperator( + task_id="drop_sp_test", + sql=r"drop procedure TEST_PROCEDURE;", + ) + drop_sp_timestamp = TeradataOperator( + task_id="drop_sp_timestamp", + sql=r"drop procedure GetTimestampOutParameter;", + ) + # [END howto_teradata_stored_procedure_operator_drop] + ( + create_sp_in_inout + >> opr_sp_types + >> opr_sp_dict + >> opr_sp_place_holder + >> create_sp_param_dr + >> opr_sp_param_dr + >> drop_sp + >> drop_sp_test + >> create_sp_timestamp + >> opr_sp_timestamp + >> drop_sp_timestamp + ) + + # [END howto_teradata_operator_for_sp] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py index d4d4014ff4ff3..ac2517a33f519 100644 --- a/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_teradata_to_teradata_transfer.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. """ -Example Airflow DAG to show usage of teradata to teradata transfer operator +Example Airflow DAG to show usage of teradata to teradata transfer operator. The transfer operator connects to source teradata server, runs query to fetch data from source and inserts that data into destination teradata database server. It assumes tables already exists. @@ -51,12 +51,11 @@ start_date=datetime.datetime(2020, 2, 2), schedule="@once", catchup=False, - default_args={"conn_id": "teradata_default"}, + default_args={"teradata_conn_id": CONN_ID}, ) as dag: # [START teradata_to_teradata_transfer_operator_howto_guide_create_src_table] create_src_table = TeradataOperator( task_id="create_src_table", - conn_id=CONN_ID, sql=""" CREATE TABLE my_users_src, FALLBACK ( @@ -76,7 +75,6 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_create_dest_table] create_dest_table = TeradataOperator( task_id="create_dest_table", - conn_id=CONN_ID, sql=""" CREATE TABLE my_users_dest, FALLBACK ( @@ -96,7 +94,6 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_insert_data_src] insert_data_src = TeradataOperator( task_id="insert_data_src", - conn_id=CONN_ID, sql=""" INSERT INTO my_users_src(user_name) VALUES ('User1'); INSERT INTO my_users_src(user_name) VALUES ('User2'); @@ -107,10 +104,7 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_read_data_src] read_data_src = TeradataOperator( task_id="read_data_src", - conn_id=CONN_ID, - sql=""" - SELECT TOP 10 * from my_users_src order by user_id desc; - """, + sql="SELECT TOP 10 * from my_users_src order by user_id desc;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_read_data_src] # [START teradata_to_teradata_transfer_operator_howto_guide_transfer_data] @@ -127,28 +121,19 @@ # [START teradata_to_teradata_transfer_operator_howto_guide_read_data_dest] read_data_dest = TeradataOperator( task_id="read_data_dest", - conn_id=CONN_ID, - sql=""" - SELECT TOP 10 * from my_users_dest order by user_id desc; - """, + sql="SELECT TOP 10 * from my_users_dest order by user_id desc;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_read_data_dest] # [START teradata_to_teradata_transfer_operator_howto_guide_drop_src_table] drop_src_table = TeradataOperator( task_id="drop_src_table", - conn_id=CONN_ID, - sql=""" - DROP TABLE my_users_src; - """, + sql=" DROP TABLE my_users_src;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_drop_src_table] # [START teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table] drop_dest_table = TeradataOperator( task_id="drop_dest_table", - conn_id=CONN_ID, - sql=""" - DROP TABLE my_users_dest; - """, + sql="DROP TABLE my_users_dest;", ) # [END teradata_to_teradata_transfer_operator_howto_guide_drop_dest_table] (