Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support of teradata authorization object for cloud transfer operator #46

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/providers/teradata/operators/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TeradataOperator(SQLExecuteQueryOperator):
"""
General Teradata Operator to execute queries on Teradata Database.

Executes sql statements in the Teradata SQL Database using teradatasql jdbc driver
Executes sql statements in the Teradata SQL Database using Teradata Python SQL Driver

.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
46 changes: 32 additions & 14 deletions airflow/providers/teradata/transfers/azure_blob_to_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ class AzureBlobStorageToTeradataOperator(BaseOperator):
The URI format is `/az/YOUR-STORAGE-ACCOUNT.blob.core.windows.net/YOUR-CONTAINER/YOUR-BLOB-LOCATION`.
Refer to
https://docs.teradata.com/search/documents?query=native+object+store&sort=last_update&virtual-field=title_only&content-lang=en-US
:param public_bucket: Specifies whether the provided blob container is public. If the blob container is public,
it means that anyone can access the objects within it via a URL without requiring authentication.
If the bucket is private and authentication is not provided, the operator will throw an exception.
:param azure_conn_id: The Airflow WASB connection used for azure blob credentials.
:param teradata_table: The name of the teradata table to which the data is transferred.(templated)
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`
:param teradata_authorization_name: The name of Teradata Authorization Database Object,
is used to control who can access an Azure Blob object store.
Refer to
https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object

Note that ``blob_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
Expand All @@ -64,37 +71,48 @@ def __init__(
self,
*,
blob_source_key: str,
public_bucket: bool = False,
azure_conn_id: str = "azure_default",
teradata_table: str,
teradata_conn_id: str = "teradata_default",
teradata_authorization_name: str = "",
**kwargs,
) -> None:
super().__init__(**kwargs)
self.blob_source_key = blob_source_key
self.public_bucket = public_bucket
self.azure_conn_id = azure_conn_id
self.teradata_table = teradata_table
self.teradata_conn_id = teradata_conn_id
self.teradata_authorization_name = teradata_authorization_name

def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...", self.blob_source_key, self.teradata_table
)
azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
conn = azure_hook.get_connection(self.azure_conn_id)
# Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
access_id = conn.login if conn.login is not None else ""
access_secret = conn.password if conn.password is not None else ""
teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''"
if not self.public_bucket:
# Accessing data directly from the Azure Blob Storage and creating permanent table inside the
# database
if self.teradata_authorization_name:
credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}"
else:
# Obtaining the Azure client ID and Azure secret in order to access a specified Blob container
azure_hook = WasbHook(wasb_conn_id=self.azure_conn_id)
conn = azure_hook.get_connection(self.azure_conn_id)
access_id = conn.login
access_secret = conn.password
credentials_part = f"ACCESS_ID= '{access_id}' ACCESS_KEY= '{access_secret}'"
sql = dedent(f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.blob_source_key}'
ACCESS_ID= '{access_id}'
ACCESS_KEY= '{access_secret}'
) AS d
) WITH DATA
""").rstrip()
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.blob_source_key}'
{credentials_part}
) AS d
) WITH DATA
""").rstrip()
try:
teradata_hook.run(sql, True)
except Exception as ex:
Expand Down
28 changes: 20 additions & 8 deletions airflow/providers/teradata/transfers/s3_to_teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class S3ToTeradataOperator(BaseOperator):
:param aws_conn_id: The Airflow AWS connection used for AWS credentials.
:param teradata_conn_id: The connection ID used to connect to Teradata
:ref:`Teradata connection <howto/connection:Teradata>`.
:param teradata_authorization_name: The name of Teradata Authorization Database Object,
is used to control who can access an S3 object store.
Refer to
https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Controlling-Foreign-Table-Access-with-an-AUTHORIZATION-Object

Note that ``s3_source_key`` and ``teradata_table`` are
templated, so you can use variables in them if you wish.
Expand All @@ -69,6 +73,7 @@ def __init__(
teradata_table: str,
aws_conn_id: str = "aws_default",
teradata_conn_id: str = "teradata_default",
teradata_authorization_name: str = "",
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -77,27 +82,34 @@ def __init__(
self.teradata_table = teradata_table
self.aws_conn_id = aws_conn_id
self.teradata_conn_id = teradata_conn_id
self.teradata_authorization_name = teradata_authorization_name

def execute(self, context: Context) -> None:
self.log.info(
"transferring data from %s to teradata table %s...", self.s3_source_key, self.teradata_table
)

s3_hook = S3Hook(aws_conn_id=self.aws_conn_id)
access_key = ""
access_secret = ""
if not self.public_bucket:
credentials = s3_hook.get_credentials()
access_key = credentials.access_key
access_secret = credentials.secret_key
teradata_hook = TeradataHook(teradata_conn_id=self.teradata_conn_id)
credentials_part = "ACCESS_ID= '' ACCESS_KEY= ''"
if not self.public_bucket:
# Accessing data directly from the S3 bucket and creating permanent table inside the database
if self.teradata_authorization_name:
credentials_part = f"AUTHORIZATION={self.teradata_authorization_name}"
else:
credentials = s3_hook.get_credentials()
access_key = credentials.access_key
access_secret = credentials.secret_key
credentials_part = f"ACCESS_ID= '{access_key}' ACCESS_KEY= '{access_secret}'"
token = credentials.token
if token:
credentials_part = credentials_part + f" SESSION_TOKEN = '{token}'"
sql = dedent(f"""
CREATE MULTISET TABLE {self.teradata_table} AS
(
SELECT * FROM (
LOCATION = '{self.s3_source_key}'
ACCESS_ID= '{access_key}'
ACCESS_KEY= '{access_secret}'
{credentials_part}
) AS d
) WITH DATA
""").rstrip()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,69 @@ AzureBlobStorageToTeradataOperator
The purpose of ``AzureBlobStorageToTeradataOperator`` is to define tasks involving CSV, JSON and Parquet
format data transfer from an Azure Blob Storage to Teradata table.
Use the :class:`AzureBlobStorageToTeradataOperator <airflow.providers.teradata.transfers.azure_blob_to_teradata>`
to transfer data from an Azure Blob Storage to Teradata.
to transfer data from an Azure Blob Storage to Teradata.This operator leverages the Teradata
`READ_NOS <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Welcome-to-Native-Object-Store>`_ feature
to import data in CSV, JSON, and Parquet formats from Azure Blob Storage into Teradata.
This operator accesses data directly from the object store and generates permanent tables
within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement.

.. code-block:: sql

CREATE MULTISET TABLE multiset_table_name AS (
SELECT *
FROM (
LOCATION='YOUR-OBJECT-STORE-URI'
AUTHORIZATION=authorization_object
) AS d
) WITH DATA;

It facilitates data loading from both public and private object storage. For private object storage, access to the object
store can be granted via either Teradata Authorization database object or Object Store Login and Object Store Key
defined with Azure Blob Storage connection in Airflow. Conversely, for data transfer from public object storage,
no authorization or access credentials are required.

* Teradata Authorization database object access type can be used with ``teradata_authorization_name`` parameter of ``AzureBlobStorageToTeradataOperator``
* Object Store Access Key ID and Access Key Secret access type can be used with ``azure_conn_id`` parameter of ``S3ToTeradataOperator``

https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges

.. note::
Teradata Authorization database object takes precedence if both access types defined.

Transferring data from public Azure Blob Storage to Teradata
------------------------------------------------------------

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from public Azure Blob Storage to teradata table is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
:end-before: [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]

Transferring data from private Azure Blob Storage to Teradata with AWS connection
---------------------------------------------------------------------------------

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with AWS credentials defined as
AWS connection:

.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv]
:end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_access_blob_to_teradata_csv]

Transferring data from private Azure Blob Storage to Teradata with Teradata Authorization Object
------------------------------------------------------------------------------------------------
Teradata authorization database object is used to control who can access an external object store. Teradata authorization
database object should exists in Teradata database to use it in transferring data from S3 to Teradata. Refer
`Authentication for External Object Stores in Teradata <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Authentication-for-External-Object-Stores>`_

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from private S3 object store to teradata with
Authorization database object defined in Teradata.

.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv]
:end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_blob_to_teradata_csv]

Transferring data in CSV format from Azure Blob Storage to Teradata
-------------------------------------------------------------------
Expand All @@ -37,8 +98,8 @@ to teradata table is as follows:

.. exampleinclude:: /../../tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py
:language: python
:start-after: [START azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
:end-before: [END azure_blob_to_teradata_transfer_operator_howto_guide_transfer_data_blob_to_teradata_csv]
:start-after: [START azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]
:end-before: [END azure_blob__to_teradata_transfer_operator_howto_guide_transfer_data_public_blob_to_teradata_csv]

Transferring data in JSON format from Azure Blob Storage to Teradata
--------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ READ_NOS is a table operator in Teradata Vantage that allows users to list exter
For more details, see `READ_NOS Functionality <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Reading-Data/Examples-For-DBAs-and-Advanced-Users/Loading-External-Data-into-the-Database/Loading-External-Data-into-the-Database-Using-READ_NOS-and-CREATE-TABLE-AS>`_

Use the :class:`S3ToTeradataOperator <airflow.providers.teradata.transfers.s3_to_teradata>`
to transfer data from S3 to Teradata.
to transfer data from S3 to Teradata. This operator leverages the Teradata
`READ_NOS <https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Welcome-to-Native-Object-Store>`_ feature
to import data in CSV, JSON, and Parquet formats from S3 into Teradata.
This operator accesses data directly from the object store and generates permanent tables
within the database using READ_NOS and CREATE TABLE AS functionalities with below SQL statement.

.. note::
The current version of ``S3ToTeradataOperator`` does not support accessing AWS S3 with Security Token Service (STS) temporary credentials. Instead, it exclusively supports accessing with long-term credentials.
Expand All @@ -43,8 +47,8 @@ An example usage of the S3ToTeradataOperator to transfer CSV data format from S3

.. exampleinclude:: /../../tests/system/providers/teradata/example_s3_to_teradata_transfer.py
:language: python
:start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
:end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_csv]
:start-after: [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
:end-before: [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]

Transferring data in JSON format from S3 to Teradata
----------------------------------------------------
Expand Down
Loading
Loading