From dd69a0af427da38d0ce6b43ceb1105bf1e431b16 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Sat, 7 May 2022 19:01:26 +0530 Subject: [PATCH 1/5] Add RedshiftDeleteClusterOperator support --- .../example_dags/example_redshift_cluster.py | 9 ++++ .../amazon/aws/operators/redshift_cluster.py | 53 +++++++++++++++++++ .../operators/redshift_cluster.rst | 14 +++++ .../aws/operators/test_redshift_cluster.py | 25 +++++++++ 4 files changed, 101 insertions(+) diff --git a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py b/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py index cbeed2da34d66..dc6deead3c618 100644 --- a/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py +++ b/airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py @@ -23,6 +23,7 @@ from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, + RedshiftDeleteClusterOperator, RedshiftPauseClusterOperator, RedshiftResumeClusterOperator, ) @@ -80,10 +81,18 @@ ) # [END howto_operator_redshift_resume_cluster] + # [START howto_operator_redshift_delete_cluster] + task_delete_cluster = RedshiftDeleteClusterOperator( + task_id="delete_cluster", + cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER, + ) + # [END howto_operator_redshift_delete_cluster] + chain( task_create_cluster, task_wait_cluster_available, task_pause_cluster, task_wait_cluster_paused, task_resume_cluster, + task_delete_cluster, ) diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index 80a1feba1d614..2c59529c433a6 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import time from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence from airflow.models import BaseOperator @@ -317,3 +318,55 @@ def execute(self, context: 'Context'): self.log.warning( "Unable to pause cluster since cluster is currently in status: %s", cluster_state ) + + +class RedshiftDeleteClusterOperator(BaseOperator): + """ + Delete an AWS Redshift cluster. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftDeleteClusterOperator` + + :param cluster_identifier: unique identifier of a cluster + :param skip_final_cluster_snapshot: determines cluster snapshot creation + :param final_cluster_snapshot_identifier: name of final cluster snapshot + :param aws_conn_id: aws connection to use + :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state + """ + + template_fields: Sequence[str] = ("cluster_identifier",) + ui_color = "#eeaa11" + ui_fgcolor = "#ffffff" + + def __init__( + self, + *, + cluster_identifier: str, + skip_final_cluster_snapshot: bool = True, + final_cluster_snapshot_identifier: Optional[str] = None, + aws_conn_id: str = "aws_default", + poll_interval: float = 30.0, + **kwargs, + ): + super().__init__(**kwargs) + self.cluster_identifier = cluster_identifier + self.skip_final_cluster_snapshot = skip_final_cluster_snapshot + self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier + self.aws_conn_id = aws_conn_id + self.poll_interval = poll_interval + + def execute(self, context: 'Context'): + redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) + redshift_hook.delete_cluster( + cluster_identifier=self.cluster_identifier, + skip_final_cluster_snapshot=self.skip_final_cluster_snapshot, + final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier, + ) + cluster_status: str = redshift_hook.cluster_status(self.cluster_identifier) + while cluster_status != "cluster_not_found": + self.log.info( + "cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval + ) + time.sleep(self.poll_interval) + cluster_status = redshift_hook.cluster_status(self.cluster_identifier) diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst index a32f43dcfc34f..219db2b2befe6 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst @@ -89,6 +89,20 @@ To pause an 'available' Amazon Redshift Cluster you can use :start-after: [START howto_operator_redshift_pause_cluster] :end-before: [END howto_operator_redshift_pause_cluster] +.. _howto/operator:RedshiftDeleteClusterOperator: + +Delete an Amazon Redshift Cluster +""""""""""""""""""""""""""""""""" + +To delete an Amazon Redshift Cluster +:class:`RedshiftDeleteClusterOperator ` + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_redshift_delete_cluster] + :end-before: [END howto_operator_redshift_delete_cluster] + Reference ^^^^^^^^^ diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py index 494d0281b3cb0..6c9e9b2a9bedb 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py @@ -19,6 +19,7 @@ from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, + RedshiftDeleteClusterOperator, RedshiftPauseClusterOperator, RedshiftResumeClusterOperator, ) @@ -127,3 +128,27 @@ def test_pause_cluster_not_called_when_cluster_is_not_available(self, mock_get_c ) redshift_operator.execute(None) mock_get_conn.return_value.pause_cluster.assert_not_called() + + +class TestDeleteClusterOperator: + def test_init(self): + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + assert redshift_operator.task_id == "task_test" + assert redshift_operator.cluster_identifier == "test_cluster" + assert redshift_operator.aws_conn_id == "aws_conn_test" + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_delete_cluster(self, mock_get_conn, mock_cluster_status): + mock_cluster_status.return_value = 'cluster_not_found' + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" + ) + redshift_operator.execute(None) + mock_get_conn.return_value.delete_cluster.assert_called_once_with( + ClusterIdentifier='test_cluster', + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier='', + ) From a0bb72228814f0f015137b9973735b54822e3b99 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 10 May 2022 13:26:50 +0530 Subject: [PATCH 2/5] Add wait_for_completion param in operator --- .../amazon/aws/operators/redshift_cluster.py | 20 ++++++++++++------- .../operators/redshift_cluster.rst | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index 2c59529c433a6..b42921e23fcd6 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -331,6 +331,8 @@ class RedshiftDeleteClusterOperator(BaseOperator): :param cluster_identifier: unique identifier of a cluster :param skip_final_cluster_snapshot: determines cluster snapshot creation :param final_cluster_snapshot_identifier: name of final cluster snapshot + :param wait_for_completion: Whether wait for cluster deletion or not + The default value is ``True`` :param aws_conn_id: aws connection to use :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state """ @@ -345,6 +347,7 @@ def __init__( cluster_identifier: str, skip_final_cluster_snapshot: bool = True, final_cluster_snapshot_identifier: Optional[str] = None, + wait_for_completion: bool = True, aws_conn_id: str = "aws_default", poll_interval: float = 30.0, **kwargs, @@ -353,6 +356,7 @@ def __init__( self.cluster_identifier = cluster_identifier self.skip_final_cluster_snapshot = skip_final_cluster_snapshot self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier + self.wait_for_completion = wait_for_completion self.aws_conn_id = aws_conn_id self.poll_interval = poll_interval @@ -363,10 +367,12 @@ def execute(self, context: 'Context'): skip_final_cluster_snapshot=self.skip_final_cluster_snapshot, final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier, ) - cluster_status: str = redshift_hook.cluster_status(self.cluster_identifier) - while cluster_status != "cluster_not_found": - self.log.info( - "cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval - ) - time.sleep(self.poll_interval) - cluster_status = redshift_hook.cluster_status(self.cluster_identifier) + + if self.wait_for_completion: + cluster_status: str = redshift_hook.cluster_status(self.cluster_identifier) + while cluster_status != "cluster_not_found": + self.log.info( + "cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval + ) + time.sleep(self.poll_interval) + cluster_status = redshift_hook.cluster_status(self.cluster_identifier) diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst index 219db2b2befe6..d999052e5f713 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst @@ -94,7 +94,7 @@ To pause an 'available' Amazon Redshift Cluster you can use Delete an Amazon Redshift Cluster """"""""""""""""""""""""""""""""" -To delete an Amazon Redshift Cluster +To delete an Amazon Redshift Cluster you can use :class:`RedshiftDeleteClusterOperator ` .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_redshift_cluster.py From 3c7777e1d210f477a44c3c01cb8cf4efc76d842b Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 10 May 2022 22:21:17 +0530 Subject: [PATCH 3/5] Wrap cluster_status and delete_cluster hook call in operator --- .../amazon/aws/operators/redshift_cluster.py | 23 +++++++++++-------- .../aws/operators/test_redshift_cluster.py | 1 - 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index b42921e23fcd6..e9b1f23b6dbbf 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -357,22 +357,27 @@ def __init__( self.skip_final_cluster_snapshot = skip_final_cluster_snapshot self.final_cluster_snapshot_identifier = final_cluster_snapshot_identifier self.wait_for_completion = wait_for_completion - self.aws_conn_id = aws_conn_id + self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id) self.poll_interval = poll_interval def execute(self, context: 'Context'): - redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id) - redshift_hook.delete_cluster( - cluster_identifier=self.cluster_identifier, - skip_final_cluster_snapshot=self.skip_final_cluster_snapshot, - final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier, - ) + self.delete_cluster() if self.wait_for_completion: - cluster_status: str = redshift_hook.cluster_status(self.cluster_identifier) + cluster_status: str = self.check_status() while cluster_status != "cluster_not_found": self.log.info( "cluster status is %s. Sleeping for %s seconds.", cluster_status, self.poll_interval ) time.sleep(self.poll_interval) - cluster_status = redshift_hook.cluster_status(self.cluster_identifier) + cluster_status = self.check_status() + + def delete_cluster(self) -> None: + self.redshift_hook.delete_cluster( + cluster_identifier=self.cluster_identifier, + skip_final_cluster_snapshot=self.skip_final_cluster_snapshot, + final_cluster_snapshot_identifier=self.final_cluster_snapshot_identifier, + ) + + def check_status(self) -> str: + return self.redshift_hook.cluster_status(self.cluster_identifier) diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py index 6c9e9b2a9bedb..72b10024aa1bb 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py @@ -137,7 +137,6 @@ def test_init(self): ) assert redshift_operator.task_id == "task_test" assert redshift_operator.cluster_identifier == "test_cluster" - assert redshift_operator.aws_conn_id == "aws_conn_test" @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") From 7989cfedbb3227158f589d84569523dc2cea2d11 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 11 May 2022 11:32:16 +0530 Subject: [PATCH 4/5] Add unit test when wait_for_completion set to false --- .../aws/operators/test_redshift_cluster.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py index 72b10024aa1bb..afca482568726 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py @@ -140,7 +140,7 @@ def test_init(self): @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") - def test_delete_cluster(self, mock_get_conn, mock_cluster_status): + def test_delete_cluster_with_wait_for_completion(self, mock_get_conn, mock_cluster_status): mock_cluster_status.return_value = 'cluster_not_found' redshift_operator = RedshiftDeleteClusterOperator( task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" @@ -151,3 +151,20 @@ def test_delete_cluster(self, mock_get_conn, mock_cluster_status): SkipFinalClusterSnapshot=True, FinalClusterSnapshotIdentifier='', ) + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") + def test_delete_cluster_without_wait_for_completion(self, mock_get_conn): + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + ) + redshift_operator.execute(None) + mock_get_conn.return_value.delete_cluster.assert_called_once_with( + ClusterIdentifier='test_cluster', + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier='', + ) + + mock_get_conn.return_value.cluster_status.assert_not_called() From a306aadd027495228353293ef3fa38efa41da859 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 11 May 2022 17:11:30 +0100 Subject: [PATCH 5/5] Apply suggestions from code review --- .../amazon/aws/operators/test_redshift_cluster.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py index afca482568726..e29b397211b95 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py @@ -131,13 +131,6 @@ def test_pause_cluster_not_called_when_cluster_is_not_available(self, mock_get_c class TestDeleteClusterOperator: - def test_init(self): - redshift_operator = RedshiftDeleteClusterOperator( - task_id="task_test", cluster_identifier="test_cluster", aws_conn_id="aws_conn_test" - ) - assert redshift_operator.task_id == "task_test" - assert redshift_operator.cluster_identifier == "test_cluster" - @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.cluster_status") @mock.patch("airflow.providers.amazon.aws.hooks.redshift.RedshiftHook.get_conn") def test_delete_cluster_with_wait_for_completion(self, mock_get_conn, mock_cluster_status):