From d610e279e2fe732e09adc38c4f7c27253eca7fc0 Mon Sep 17 00:00:00 2001 From: Satish Ch Date: Thu, 20 Jun 2024 13:00:02 +0530 Subject: [PATCH] Added support of teradata authorization object for cloud transfer operators to teradata. 1. Added teradata authorization object for authorization in transfer operators 2. Added security token support in s3toteradata transfer operator --- .../providers/teradata/operators/teradata.py | 2 +- .../transfers/azure_blob_to_teradata.py | 46 +++++++++---- .../teradata/transfers/s3_to_teradata.py | 28 +++++--- .../operators/azure_blob_to_teradata.rst | 67 ++++++++++++++++++- .../operators/s3_to_teradata.rst | 10 ++- ...example_azure_blob_to_teradata_transfer.py | 64 +++++++++++++++++- .../example_s3_to_teradata_transfer.py | 63 +++++++++++++++-- 7 files changed, 244 insertions(+), 36 deletions(-) diff --git a/airflow/providers/teradata/operators/teradata.py b/airflow/providers/teradata/operators/teradata.py index 00cd7a86c7d0b..c15fc290385d8 100644 --- a/airflow/providers/teradata/operators/teradata.py +++ b/airflow/providers/teradata/operators/teradata.py @@ -31,7 +31,7 @@ class TeradataOperator(SQLExecuteQueryOperator): """ General Teradata Operator to execute queries on Teradata Database. - Executes sql statements in the Teradata SQL Database using teradatasql jdbc driver + Executes sql statements in the Teradata SQL Database using Teradata Python SQL Driver .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py index 416b4e7136cb0..8fc95122f1c63 100644 --- a/airflow/providers/teradata/transfers/azure_blob_to_teradata.py +++ b/airflow/providers/teradata/transfers/azure_blob_to_teradata.py @@ -48,10 +48,17 @@ class AzureBlobStorageToTeradataOperator(BaseOperator): The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`. Refer to https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US + :param public_bucket: Specifies whether the provided blob container is public. If the blob container is public, + it means that anyone can access the objects within it via a URL without requiring authentication. + If the bucket is private and authentication is not provided, the operator will throw an exception. :param azure_conn_id: The Airflow WASB connection used for azure blob credentials. :param teradata_table: The name of the teradata table to which the data is transferred.(templated) :param teradata_conn_id: The connection ID used to connect to Teradata :ref:`Teradata connection ` + :param teradata_authorization_name: The name of Teradata Authorization Database Object, + is used to control who can access an Azure Blob object store. + Refer to + https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object Note that ``blob_source_key`` and ``teradata_table`` are templated, so you can use variables in them if you wish. @@ -64,37 +71,48 @@ def __init__( self, *, blob_source_key: str, + public_bucket: bool = False, azure_conn_id: str = "azure_default", teradata_table: str, teradata_conn_id: str = "teradata_default", + teradata_authorization_name: str = "", **kwargs, ) -> None: super().__init__(**kwargs) self.blob_source_key = blob_source_key + self.public_bucket = public_bucket self.azure_conn_id = azure_conn_id self.teradata_table = teradata_table self.teradata_conn_id = teradata_conn_id + self.teradata_authorization_name = teradata_authorization_name def execute(self, context: Context) -> None: self.log.info( "transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table ) - azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id) - conn = azure_hook.get_connection(self.azure_conn_id) - # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container - access_id = conn.login if conn.login is not None else "" - access_secret = conn.password if conn.password is not None else "" teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''" + if not self.public_bucket: + # Accessing data directly from the Azure Blob Storage and creating permanent table inside the + # database + if self.teradata_authorization_name: + credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}" + else: + # Obtaining the Azure client ID and Azure secret in order to access a specified Blob container + azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id) + conn = azure_hook.get_connection(self.azure_conn_id) + access_id = conn.login + access_secret = conn.password + credentials_part = f"ACCESS_ID= '{access_id}' ACCESS_KEY= '{access_secret}'" sql = dedent(f""" - CREATE MULTISET TABLE {self.teradata_table} AS - ( - SELECT * FROM ( - LOCATION = '{self.blob_source_key}' - ACCESS_ID= '{access_id}' - ACCESS_KEY= '{access_secret}' - ) AS d - ) WITH DATA - """).rstrip() + CREATE MULTISET TABLE {self.teradata_table} AS + ( + SELECT * FROM ( + LOCATION = '{self.blob_source_key}' + {credentials_part} + ) AS d + ) WITH DATA + """).rstrip() try: teradata_hook.run(sql, True) except Exception as ex: diff --git a/airflow/providers/teradata/transfers/s3_to_teradata.py b/airflow/providers/teradata/transfers/s3_to_teradata.py index f7998ea861135..d0bca09165b11 100644 --- a/airflow/providers/teradata/transfers/s3_to_teradata.py +++ b/airflow/providers/teradata/transfers/s3_to_teradata.py @@ -53,6 +53,10 @@ class S3ToTeradataOperator(BaseOperator): :param aws_conn_id: The Airflow AWS connection used for AWS credentials. :param teradata_conn_id: The connection ID used to connect to Teradata :ref:`Teradata connection `. + :param teradata_authorization_name: The name of Teradata Authorization Database Object, + is used to control who can access an S3 object store. + Refer to + https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object Note that ``s3_source_key`` and ``teradata_table`` are templated, so you can use variables in them if you wish. @@ -69,6 +73,7 @@ def __init__( teradata_table: str, aws_conn_id: str = "aws_default", teradata_conn_id: str = "teradata_default", + teradata_authorization_name: str = "", **kwargs, ) -> None: super().__init__(**kwargs) @@ -77,6 +82,7 @@ def __init__( self.teradata_table = teradata_table self.aws_conn_id = aws_conn_id self.teradata_conn_id = teradata_conn_id + self.teradata_authorization_name = teradata_authorization_name def execute(self, context: Context) -> None: self.log.info( @@ -84,20 +90,26 @@ def execute(self, context: Context) -> None: ) s3_hook = S3Hook(aws_conn_id=self.aws_conn_id) - access_key = "" - access_secret = "" - if not self.public_bucket: - credentials = s3_hook.get_credentials() - access_key = credentials.access_key - access_secret = credentials.secret_key teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) + credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''" + if not self.public_bucket: + # Accessing data directly from the S3 bucket and creating permanent table inside the database + if self.teradata_authorization_name: + credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}" + else: + credentials = s3_hook.get_credentials() + access_key = credentials.access_key + access_secret = credentials.secret_key + credentials_part = f"ACCESS_ID= '{access_key}' ACCESS_KEY= '{access_secret}'" + token = credentials.token + if token: + credentials_part = credentials_part + f" SESSION_TOKEN = '{token}'" sql = dedent(f""" CREATE MULTISET TABLE {self.teradata_table} AS ( SELECT * FROM ( LOCATION = '{self.s3_source_key}' - ACCESS_ID= '{access_key}' - ACCESS_KEY= '{access_secret}' + {credentials_part} ) AS d ) WITH DATA """).rstrip() diff --git a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst index 0ee9a7bb32595..194eabd0cd001 100644 --- a/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/azure_blob_to_teradata.rst @@ -26,8 +26,69 @@ AzureBlobStorageToTeradataOperator The purpose of ``AzureBlobStorageToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet format data transfer from an Azure Blob Storage to Teradata table. Use the :class:`AzureBlobStorageToTeradataOperator ` -to transfer data from an Azure Blob Storage to Teradata. +to transfer data from an Azure Blob Storage to Teradata.This operator leverages the Teradata +`READ_NOS `_ feature +to import data in CSV, JSON, and Parquet formats from Azure Blob Storage into Teradata. +This operator accesses data directly from the object store and generates permanent tables +within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement. +.. code-block:: sql + + CREATE MULTISET TABLE multiset_table_name AS ( + SELECT * + FROM ( + LOCATION='YOUR-OBJECT-STORE-URI' + AUTHORIZATION=authorization_object + ) AS d + ) WITH DATA; + +It facilitates data loading from both public and private object storage. For private object storage, access to the object +store can be granted via either Teradata Authorization database object or Object Store Login and Object Store Key +defined with Azure Blob Storage connection in Airflow. Conversely, for data transfer from public object storage, +no authorization or access credentials are required. + +* Teradata Authorization database object access type can be used with ``teradata_authorization_name`` parameter of ``AzureBlobStorageToTeradataOperator`` +* Object Store Access Key ID and Access Key Secret access type can be used with ``azure_conn_id`` parameter of ``S3ToTeradataOperator`` + +https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges + +.. note:: + Teradata Authorization database object takes precedence if both access types defined. + +Transferring data from public Azure Blob Storage to Teradata +------------------------------------------------------------ + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from public Azure Blob Storage to teradata table is as follows: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] + :end-before: [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] + +Transferring data from private Azure Blob Storage to Teradata with AWS connection +--------------------------------------------------------------------------------- + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with AWS credentials defined as +AWS connection: + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + +Transferring data from private Azure Blob Storage to Teradata with Teradata Authorization Object +------------------------------------------------------------------------------------------------ +Teradata authorization database object is used to control who can access an external object store. Teradata authorization +database object should exists in Teradata database to use it in transferring data from S3 to Teradata. Refer +`Authentication for External Object Stores in Teradata `_ + +An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with +Authorization database object defined in Teradata. + +.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py + :language: python + :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] + :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] Transferring data in CSV format from Azure Blob Storage to Teradata ------------------------------------------------------------------- @@ -37,8 +98,8 @@ to teradata table is as follows: .. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py :language: python - :start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] - :end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + :start-after: [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] + :end-before: [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] Transferring data in JSON format from Azure Blob Storage to Teradata -------------------------------------------------------------------- diff --git a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst index a6ecbc6f146b8..da52e2841bfa3 100644 --- a/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst +++ b/docs/apache-airflow-providers-teradata/operators/s3_to_teradata.rst @@ -30,7 +30,11 @@ READ_NOS is a table operator in Teradata Vantage that allows users to list exter For more details, see `READ_NOS Functionality `_ Use the :class:`S3ToTeradataOperator ` -to transfer data from S3 to Teradata. +to transfer data from S3 to Teradata. This operator leverages the Teradata +`READ_NOS `_ feature +to import data in CSV, JSON, and Parquet formats from S3 into Teradata. +This operator accesses data directly from the object store and generates permanent tables +within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement. .. note:: The current version of ``S3ToTeradataOperator`` does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials. @@ -43,8 +47,8 @@ An example usage of the S3ToTeradataOperator to transfer CSV data format from S3 .. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py :language: python - :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] - :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + :start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] + :end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] Transferring data in JSON format from S3 to Teradata ---------------------------------------------------- diff --git a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py index 5d961550de59d..8fbdd70d476c8 100644 --- a/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py @@ -53,15 +53,17 @@ catchup=False, default_args={"teradata_conn_id": CONN_ID}, ) as dag: - # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + # [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] transfer_data_csv = AzureBlobStorageToTeradataOperator( task_id="transfer_data_blob_to_teradata_csv", blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/", + public_bucket=True, teradata_table="example_blob_teradata_csv", + teradata_conn_id="teradata_default", azure_conn_id="wasb_default", trigger_rule="all_done", ) - # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv] + # [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv] # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] read_data_table_csv = TeradataOperator( task_id="read_data_table_csv", @@ -74,11 +76,61 @@ sql="DROP TABLE example_blob_teradata_csv;", ) # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + transfer_key_data_csv = AzureBlobStorageToTeradataOperator( + task_id="transfer_key_data_blob_to_teradata_csv", + blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/", + teradata_table="example_blob_teradata_csv", + azure_conn_id="wasb_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_key_data_table_csv = TeradataOperator( + task_id="read_key_data_table_csv", + conn_id=CONN_ID, + sql="SELECT count(1) from example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_key_table_csv = TeradataOperator( + task_id="drop_key_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] + transfer_auth_data_csv = AzureBlobStorageToTeradataOperator( + task_id="transfer_auth_data_blob_to_teradata_csv", + blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/", + teradata_table="example_blob_teradata_csv", + teradata_authorization_name="azure_authorization", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_auth_data_table_csv = TeradataOperator( + task_id="read_auth_data_table_csv", + conn_id=CONN_ID, + sql="SELECT count(1) from example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_auth_table_csv = TeradataOperator( + task_id="drop_auth_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_blob_teradata_csv;", + ) + # [END azure_blob_to_teradata_transfer_operator_howto_guide_drop_table_csv] # [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_json] transfer_data_json = AzureBlobStorageToTeradataOperator( task_id="transfer_data_blob_to_teradata_json", blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/", teradata_table="example_blob_teradata_json", + public_bucket=True, + teradata_conn_id="teradata_default", azure_conn_id="wasb_default", trigger_rule="all_done", ) @@ -100,7 +152,7 @@ task_id="transfer_data_blob_to_teradata_parquet", blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/", teradata_table="example_blob_teradata_parquet", - azure_conn_id="wasb_default", + public_bucket=True, teradata_conn_id="teradata_default", trigger_rule="all_done", ) @@ -128,6 +180,12 @@ >> drop_table_csv >> drop_table_json >> drop_table_parquet + >> transfer_key_data_csv + >> read_key_data_table_csv + >> drop_key_table_csv + >> transfer_auth_data_csv + >> read_auth_data_table_csv + >> drop_auth_table_csv ) # [END azure_blob_to_teradata_transfer_operator_howto_guide] diff --git a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py index fc5e2627393f8..df5155877508c 100644 --- a/tests/system/providers/teradata/example_s3_to_teradata_transfer.py +++ b/tests/system/providers/teradata/example_s3_to_teradata_transfer.py @@ -54,7 +54,7 @@ catchup=False, default_args={"teradata_conn_id": CONN_ID}, ) as dag: - # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] transfer_data_csv = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_csv", s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/", @@ -63,7 +63,7 @@ aws_conn_id="aws_default", trigger_rule="all_done", ) - # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv] + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv] # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] read_data_table_csv = TeradataOperator( task_id="read_data_table_csv", @@ -78,6 +78,54 @@ sql="DROP TABLE example_s3_teradata_csv;", ) # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv] + transfer_key_data_csv = S3ToTeradataOperator( + task_id="transfer_key_data_s3_to_teradata_key_csv", + s3_source_key="/s3/airflowteradatatest.s3.ap-southeast-2.amazonaws.com/", + teradata_table="example_s3_teradata_csv", + aws_conn_id="aws_default", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_key_data_table_csv = TeradataOperator( + task_id="read_key_data_table_csv", + conn_id=CONN_ID, + sql="SELECT * from example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_key_table_csv = TeradataOperator( + task_id="drop_key_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv] + transfer_auth_data_csv = S3ToTeradataOperator( + task_id="transfer_auth_data_s3_to_teradata_auth_csv", + s3_source_key="/s3/teradata-download.s3.us-east-1.amazonaws.com/DevTools/csv/", + teradata_table="example_s3_teradata_csv", + teradata_authorization_name="aws_authorization", + teradata_conn_id="teradata_default", + trigger_rule="all_done", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + read_auth_data_table_csv = TeradataOperator( + task_id="read_auth_data_table_csv", + conn_id=CONN_ID, + sql="SELECT * from example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] + drop_auth_table_csv = TeradataOperator( + task_id="drop_auth_table_csv", + conn_id=CONN_ID, + sql="DROP TABLE example_s3_teradata_csv;", + ) + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv] # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json] transfer_data_json = S3ToTeradataOperator( task_id="transfer_data_s3_to_teradata_json", @@ -116,12 +164,13 @@ sql="SELECT * from example_s3_teradata_parquet;", ) # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet] - # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + # [START s3_to_teradata_transfer_operator_howto_guide_drop_table] drop_table_parquet = TeradataOperator( task_id="drop_table_parquet", sql="DROP TABLE example_s3_teradata_parquet;", ) - # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_parquet] + + # [END s3_to_teradata_transfer_operator_howto_guide_drop_table] ( transfer_data_csv >> transfer_data_json @@ -132,6 +181,12 @@ >> drop_table_csv >> drop_table_json >> drop_table_parquet + >> transfer_key_data_csv + >> read_key_data_table_csv + >> drop_key_table_csv + >> transfer_auth_data_csv + >> read_auth_data_table_csv + >> drop_auth_table_csv ) # [END s3_to_teradata_transfer_operator_howto_guide]