-
Notifications
You must be signed in to change notification settings - Fork 14.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added support of Teradata Compute Cluster Provision, Decommission, Su…
…spend and Resume operations (#40509) Support added to Teradata Provider about Teradata Compute Cluster feature Provisioning compute cluster instance Decommissioning compute cluster instance Resume and Suspend of compute cluster instance
- Loading branch information
1 parent
5b2becb
commit 8ced563
Showing
13 changed files
with
2,049 additions
and
0 deletions.
There are no files selected for viewing
513 changes: 513 additions & 0 deletions
513
airflow/providers/teradata/operators/teradata_compute_cluster.py
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
155 changes: 155 additions & 0 deletions
155
airflow/providers/teradata/triggers/teradata_compute_cluster.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from __future__ import annotations | ||
|
||
import asyncio | ||
from typing import Any, AsyncIterator | ||
|
||
from airflow.exceptions import AirflowException | ||
from airflow.providers.common.sql.hooks.sql import fetch_one_handler | ||
from airflow.providers.teradata.hooks.teradata import TeradataHook | ||
from airflow.providers.teradata.utils.constants import Constants | ||
from airflow.triggers.base import BaseTrigger, TriggerEvent | ||
|
||
|
||
class TeradataComputeClusterSyncTrigger(BaseTrigger): | ||
""" | ||
Fetch the status of the suspend or resume operation for the specified compute cluster. | ||
:param teradata_conn_id: The :ref:`Teradata connection id <howto/connection:teradata>` | ||
reference to a specific Teradata database. | ||
:param compute_profile_name: Name of the Compute Profile to manage. | ||
:param compute_group_name: Name of compute group to which compute profile belongs. | ||
:param opr_type: Compute cluster operation - SUSPEND/RESUME | ||
:param poll_interval: polling period in minutes to check for the status | ||
""" | ||
|
||
def __init__( | ||
self, | ||
teradata_conn_id: str, | ||
compute_profile_name: str, | ||
compute_group_name: str | None = None, | ||
operation_type: str | None = None, | ||
poll_interval: float | None = None, | ||
): | ||
super().__init__() | ||
self.teradata_conn_id = teradata_conn_id | ||
self.compute_profile_name = compute_profile_name | ||
self.compute_group_name = compute_group_name | ||
self.operation_type = operation_type | ||
self.poll_interval = poll_interval | ||
|
||
def serialize(self) -> tuple[str, dict[str, Any]]: | ||
"""Serialize TeradataComputeClusterSyncTrigger arguments and classpath.""" | ||
return ( | ||
"airflow.providers.teradata.triggers.teradata_compute_cluster.TeradataComputeClusterSyncTrigger", | ||
{ | ||
"teradata_conn_id": self.teradata_conn_id, | ||
"compute_profile_name": self.compute_profile_name, | ||
"compute_group_name": self.compute_group_name, | ||
"operation_type": self.operation_type, | ||
"poll_interval": self.poll_interval, | ||
}, | ||
) | ||
|
||
async def run(self) -> AsyncIterator[TriggerEvent]: | ||
"""Wait for Compute Cluster operation to complete.""" | ||
try: | ||
while True: | ||
status = await self.get_status() | ||
if status is None or len(status) == 0: | ||
self.log.info(Constants.CC_GRP_PRP_NON_EXISTS_MSG) | ||
raise AirflowException(Constants.CC_GRP_PRP_NON_EXISTS_MSG) | ||
if ( | ||
self.operation_type == Constants.CC_SUSPEND_OPR | ||
or self.operation_type == Constants.CC_CREATE_SUSPEND_OPR | ||
): | ||
if status == Constants.CC_SUSPEND_DB_STATUS: | ||
break | ||
elif ( | ||
self.operation_type == Constants.CC_RESUME_OPR | ||
or self.operation_type == Constants.CC_CREATE_OPR | ||
): | ||
if status == Constants.CC_RESUME_DB_STATUS: | ||
break | ||
if self.poll_interval is not None: | ||
self.poll_interval = float(self.poll_interval) | ||
else: | ||
self.poll_interval = float(Constants.CC_POLL_INTERVAL) | ||
await asyncio.sleep(self.poll_interval) | ||
if ( | ||
self.operation_type == Constants.CC_SUSPEND_OPR | ||
or self.operation_type == Constants.CC_CREATE_SUSPEND_OPR | ||
): | ||
if status == Constants.CC_SUSPEND_DB_STATUS: | ||
yield TriggerEvent( | ||
{ | ||
"status": "success", | ||
"message": Constants.CC_OPR_SUCCESS_STATUS_MSG | ||
% (self.compute_profile_name, self.operation_type), | ||
} | ||
) | ||
else: | ||
yield TriggerEvent( | ||
{ | ||
"status": "error", | ||
"message": Constants.CC_OPR_FAILURE_STATUS_MSG | ||
% (self.compute_profile_name, self.operation_type), | ||
} | ||
) | ||
elif ( | ||
self.operation_type == Constants.CC_RESUME_OPR | ||
or self.operation_type == Constants.CC_CREATE_OPR | ||
): | ||
if status == Constants.CC_RESUME_DB_STATUS: | ||
yield TriggerEvent( | ||
{ | ||
"status": "success", | ||
"message": Constants.CC_OPR_SUCCESS_STATUS_MSG | ||
% (self.compute_profile_name, self.operation_type), | ||
} | ||
) | ||
else: | ||
yield TriggerEvent( | ||
{ | ||
"status": "error", | ||
"message": Constants.CC_OPR_FAILURE_STATUS_MSG | ||
% (self.compute_profile_name, self.operation_type), | ||
} | ||
) | ||
else: | ||
yield TriggerEvent({"status": "error", "message": "Invalid operation"}) | ||
except Exception as e: | ||
yield TriggerEvent({"status": "error", "message": str(e)}) | ||
except asyncio.CancelledError: | ||
self.log.error(Constants.CC_OPR_TIMEOUT_ERROR, self.operation_type) | ||
|
||
async def get_status(self) -> str: | ||
"""Return compute cluster SUSPEND/RESUME operation status.""" | ||
sql = ( | ||
"SEL ComputeProfileState FROM DBC.ComputeProfilesVX WHERE UPPER(ComputeProfileName) = UPPER('" | ||
+ self.compute_profile_name | ||
+ "')" | ||
) | ||
if self.compute_group_name: | ||
sql += " AND UPPER(ComputeGroupName) = UPPER('" + self.compute_group_name + "')" | ||
hook = TeradataHook(teradata_conn_id=self.teradata_conn_id) | ||
result_set = hook.run(sql, handler=fetch_one_handler) | ||
status = "" | ||
if isinstance(result_set, list) and isinstance(result_set[0], str): | ||
status = str(result_set[0]) | ||
return status |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from __future__ import annotations | ||
|
||
|
||
class Constants: | ||
"""Define constants for Teradata Provider.""" | ||
|
||
CC_CREATE_OPR = "CREATE" | ||
CC_CREATE_SUSPEND_OPR = "CREATE_SUSPEND" | ||
CC_DROP_OPR = "DROP" | ||
CC_SUSPEND_OPR = "SUSPEND" | ||
CC_RESUME_OPR = "RESUME" | ||
CC_INITIALIZE_DB_STATUS = "Initializing" | ||
CC_SUSPEND_DB_STATUS = "Suspended" | ||
CC_RESUME_DB_STATUS = "Running" | ||
CC_OPR_SUCCESS_STATUS_MSG = "Compute Cluster %s %s operation completed successfully." | ||
CC_OPR_FAILURE_STATUS_MSG = "Compute Cluster %s %s operation has failed." | ||
CC_OPR_INITIALIZING_STATUS_MSG = "The environment is currently initializing. Please wait." | ||
CC_OPR_EMPTY_PROFILE_ERROR_MSG = "Please provide a valid name for the compute cluster profile." | ||
CC_GRP_PRP_NON_EXISTS_MSG = "The specified Compute cluster is not present or The user doesn't have permission to access compute cluster." | ||
CC_GRP_PRP_UN_AUTHORIZED_MSG = "The %s operation is not authorized for the user." | ||
CC_GRP_LAKE_SUPPORT_ONLY_MSG = "Compute Groups is supported only on Vantage Cloud Lake." | ||
CC_OPR_TIMEOUT_ERROR = ( | ||
"There is an issue with the %s operation. Kindly consult the administrator for assistance." | ||
) | ||
CC_GRP_PRP_EXISTS_MSG = "The specified Compute cluster is already exists." | ||
CC_OPR_EMPTY_COPY_PROFILE_ERROR_MSG = ( | ||
"Please provide a valid name for the source and target compute profile." | ||
) | ||
CC_OPR_TIME_OUT = 1200 | ||
CC_POLL_INTERVAL = 60 |
107 changes: 107 additions & 0 deletions
107
docs/apache-airflow-providers-teradata/operators/compute_cluster.rst
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
.. Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
.. http://www.apache.org/licenses/LICENSE-2.0 | ||
.. Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
.. _howto/operator:TeradataComputeClusterProvisionOperator: | ||
|
||
|
||
======================================= | ||
TeradataComputeClusterProvisionOperator | ||
======================================= | ||
|
||
The purpose of ``TeradataComputeClusterProvisionOperator`` is to provision the new Teradata Vantage Cloud Lake | ||
Compute Cluster with specified Compute Group Name and Compute Profile Name. | ||
Use the :class:`TeradataComputeClusterProvisionOperator <airflow.providers.teradata.operators.teradata_compute_cluster>` | ||
to provision the new Compute Cluster in Teradata Vantage Cloud Lake. | ||
|
||
|
||
|
||
An example usage of the TeradataComputeClusterProvisionOperator to provision the new Compute Cluster in | ||
Teradata Vantage Cloud Lake is as follows: | ||
|
||
.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py | ||
:language: python | ||
:start-after: [START teradata_vantage_lake_compute_cluster_provision_howto_guide] | ||
:end-before: [END teradata_vantage_lake_compute_cluster_provision_howto_guide] | ||
|
||
|
||
.. _howto/operator:TeradataComputeClusterDecommissionOperator: | ||
|
||
|
||
========================================== | ||
TeradataComputeClusterDecommissionOperator | ||
========================================== | ||
|
||
The purpose of ``TeradataComputeClusterDecommissionOperator`` is to decommission the specified Teradata Vantage Cloud Lake | ||
Compute Cluster. | ||
Use the :class:`TeradataComputeClusterProvisionOperator <airflow.providers.teradata.operators.teradata_compute_cluster>` | ||
to decommission the specified Teradata Vantage Cloud Lake Compute Cluster. | ||
|
||
|
||
|
||
An example usage of the TeradataComputeClusterDecommissionOperator to decommission the specified Teradata Vantage Cloud | ||
Lake Compute Cluster is as follows: | ||
|
||
.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py | ||
:language: python | ||
:start-after: [START teradata_vantage_lake_compute_cluster_decommission_howto_guide] | ||
:end-before: [END teradata_vantage_lake_compute_cluster_decommission_howto_guide] | ||
|
||
|
||
.. _howto/operator:TeradataComputeClusterResumeOperator: | ||
|
||
|
||
===================================== | ||
TeradataComputeClusterResumeOperator | ||
===================================== | ||
|
||
The purpose of ``TeradataComputeClusterResumeOperator`` is to start the Teradata Vantage Cloud Lake | ||
Compute Cluster of specified Compute Group Name and Compute Profile Name. | ||
Use the :class:`TeradataComputeClusterResumeOperator <airflow.providers.teradata.operators.teradata_compute_cluster>` | ||
to start the specified Compute Cluster in Teradata Vantage Cloud Lake. | ||
|
||
|
||
|
||
An example usage of the TeradataComputeClusterSuspendOperator to start the specified Compute Cluster in | ||
Teradata Vantage Cloud Lake is as follows: | ||
|
||
.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py | ||
:language: python | ||
:start-after: [START teradata_vantage_lake_compute_cluster_resume_howto_guide] | ||
:end-before: [END teradata_vantage_lake_compute_cluster_resume_howto_guide] | ||
|
||
.. _howto/operator:TeradataComputeClusterSuspendOperator: | ||
|
||
|
||
===================================== | ||
TeradataComputeClusterSuspendOperator | ||
===================================== | ||
|
||
The purpose of ``TeradataComputeClusterSuspendOperator`` is to suspend the Teradata Vantage Cloud Lake | ||
Compute Cluster of specified Compute Group Name and Compute Profile Name. | ||
Use the :class:`TeradataComputeClusterSuspendOperator <airflow.providers.teradata.operators.teradata_compute_cluster>` | ||
to suspend the specified Compute Cluster in Teradata Vantage Cloud Lake. | ||
|
||
|
||
|
||
An example usage of the TeradataComputeClusterSuspendOperator to suspend the specified Compute Cluster in | ||
Teradata Vantage Cloud Lake is as follows: | ||
|
||
.. exampleinclude:: /../../tests/system/providers/teradata/example_teradata_compute_cluster.py | ||
:language: python | ||
:start-after: [START teradata_vantage_lake_compute_cluster_suspend_howto_guide] | ||
:end-before: [END teradata_vantage_lake_compute_cluster_suspend_howto_guide] |
Oops, something went wrong.