diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py b/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py index cbeed2da34d66..dc6deead3c618 100644 --- a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +++ b/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py @@ -23,6 +23,7 @@ from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, + RedshiftDeleteClusterOperator, RedshiftPauseClusterOperator, RedshiftResumeClusterOperator, ) @@ -80,10 +81,18 @@ ) # [END howto_operator_redshift_resume_cluster] + # [START howto_operator_redshift_delete_cluster] + task_delete_cluster = RedshiftDeleteClusterOperator( + task_id="delete_cluster", + cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER, + ) + # [END howto_operator_redshift_delete_cluster] + chain( task_create_cluster, task_wait_cluster_available, task_pause_cluster, task_wait_cluster_paused, task_resume_cluster, + task_delete_cluster, ) diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index 80a1feba1d614..e9b1f23b6dbbf 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import time from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence from airflow.models import BaseOperator @@ -317,3 +318,66 @@ def execute(self, context: 'Context'): self.log.warning( "Unable to pause cluster since cluster is currently in status: %s", cluster_state ) + + +class RedshiftDeleteClusterOperator(BaseOperator): + """ + Delete an AWS Redshift cluster. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftDeleteClusterOperator` + + :param cluster_identifier: unique identifier of a cluster + :param skip_final_cluster_snapshot: determines cluster snapshot creation + :param final_cluster_snapshot_identifier: name of final cluster snapshot + :param wait_for_completion: Whether wait for cluster deletion or not + The default value is ``True`` + :param aws_conn_id: aws connection to use + :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state + """ + + template_fields: Sequence[str] = ("cluster_identifier",) + ui_color = "#eeaa11" + ui_fgcolor = "#ffffff" + + def __init__( + self, + *, + cluster_identifier: str, + skip_final_cluster_snapshot: bool = True, + final_cluster_snapshot_identifier: Optional[str] = None, + wait_for_completion: bool = True, + aws_conn_id: str = "aws_default", + poll_interval: float = 30.0, + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_identifier = cluster_identifier + self.skip_final_cluster_snapshot = skip_final_cluster_snapshot + self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier + self.wait_for_completion = wait_for_completion + self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id) + self.poll_interval = poll_interval + + def execute(self, context: 'Context'): + self.delete_cluster() + + if self.wait_for_completion: + cluster_status: str = self.check_status() + while cluster_status != "cluster_not_found": + self.log.info( + "cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval + ) + time.sleep(self.poll_interval) + cluster_status = self.check_status() + + def delete_cluster(self) -> None: + self.redshift_hook.delete_cluster( + cluster_identifier=self.cluster_identifier, + skip_final_cluster_snapshot=self.skip_final_cluster_snapshot, + final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier, + ) + + def check_status(self) -> str: + return self.redshift_hook.cluster_status(self.cluster_identifier) diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst index a32f43dcfc34f..d999052e5f713 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst @@ -89,6 +89,20 @@ To pause an 'available' Amazon Redshift Cluster you can use :start-after: [START howto_operator_redshift_pause_cluster] :end-before: [END howto_operator_redshift_pause_cluster] +.. _howto/operator:RedshiftDeleteClusterOperator: + +Delete an Amazon Redshift Cluster +""""""""""""""""""""""""""""""""" + +To delete an Amazon Redshift Cluster you can use +:class:`RedshiftDeleteClusterOperator ` + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_redshift_delete_cluster] + :end-before: [END howto_operator_redshift_delete_cluster] + Reference ^^^^^^^^^ diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py index 494d0281b3cb0..e29b397211b95 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py @@ -19,6 +19,7 @@ from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, + RedshiftDeleteClusterOperator, RedshiftPauseClusterOperator, RedshiftResumeClusterOperator, ) @@ -127,3 +128,36 @@ def test_pause_cluster_not_called_when_cluster_is_not_available(self, mock_get_c ) redshift_operator.execute(None) mock_get_conn.return_value.pause_cluster.assert_not_called() + + +class TestDeleteClusterOperator: + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_delete_cluster_with_wait_for_completion(self, mock_get_conn, mock_cluster_status): + mock_cluster_status.return_value = 'cluster_not_found' + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + redshift_operator.execute(None) + mock_get_conn.return_value.delete_cluster.assert_called_once_with( + ClusterIdentifier='test_cluster', + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier='', + ) + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_delete_cluster_without_wait_for_completion(self, mock_get_conn): + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + ) + redshift_operator.execute(None) + mock_get_conn.return_value.delete_cluster.assert_called_once_with( + ClusterIdentifier='test_cluster', + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier='', + ) + + mock_get_conn.return_value.cluster_status.assert_not_called()