From d0d3c59bce368911d405bf190d941e336522c938 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 26 Aug 2022 01:58:57 +0530 Subject: [PATCH] Add RedshiftDeleteClusterSnapshotOperator --- .../amazon/aws/hooks/redshift_cluster.py | 15 ++++++ .../amazon/aws/operators/redshift_cluster.py | 51 +++++++++++++++++++ .../operators/redshift_cluster.rst | 14 +++++ .../aws/operators/test_redshift_cluster.py | 44 ++++++++++++++++ .../amazon/aws/example_redshift_cluster.py | 12 ++++- 5 files changed, 135 insertions(+), 1 deletion(-) diff --git a/airflow/providers/amazon/aws/hooks/redshift_cluster.py b/airflow/providers/amazon/aws/hooks/redshift_cluster.py index 648d33ed02b40..54b4b87b635c8 100644 --- a/airflow/providers/amazon/aws/hooks/redshift_cluster.py +++ b/airflow/providers/amazon/aws/hooks/redshift_cluster.py @@ -153,3 +153,18 @@ def create_cluster_snapshot( ManualSnapshotRetentionPeriod=retention_period, ) return response['Snapshot'] if response['Snapshot'] else None + + def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifier: str): + try: + response = self.get_conn().describe_cluster_snapshots( + ClusterIdentifier=cluster_identifier, + SnapshotIdentifier=snapshot_identifier, + ) + snapshot = response.get("Snapshots")[0] + snapshot_status: str = snapshot.get("Status") + return snapshot_status + except ClientError as exception: + if exception.response.get("Error", {}).get("Code", "") == "ClusterSnapshotNotFound": + return "cluster_snapshot_not_found" + else: + raise exception diff --git a/airflow/providers/amazon/aws/operators/redshift_cluster.py b/airflow/providers/amazon/aws/operators/redshift_cluster.py index 94cb74c213599..c09ba5211b445 100644 --- a/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -308,6 +308,57 @@ def execute(self, context: "Context") -> Any: ) +class RedshiftDeleteClusterSnapshotOperator(BaseOperator): + """ + Deletes the specified manual snapshot + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:RedshiftDeleteClusterSnapshotOperator` + + :param snapshot_identifier: A unique identifier for the snapshot that you are requesting + :param cluster_identifier: The unique identifier of the cluster the snapshot was created from + :param wait_for_completion: Whether wait for cluster deletion or not + The default value is ``True`` + :param aws_conn_id: The Airflow connection used for AWS credentials. + The default connection id is ``aws_default`` + :param poll_interval: Time (in seconds) to wait between two consecutive calls to check snapshot state + """ + + def __init__( + self, + *, + snapshot_identifier: str, + cluster_identifier: str, + wait_for_completion: bool = True, + aws_conn_id: str = "aws_default", + poll_interval: int = 10, + **kwargs, + ): + super().__init__(**kwargs) + self.snapshot_identifier = snapshot_identifier + self.cluster_identifier = cluster_identifier + self.wait_for_completion = wait_for_completion + self.poll_interval = poll_interval + self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id) + + def execute(self, context: "Context") -> Any: + self.redshift_hook.get_conn().delete_cluster_snapshot( + SnapshotClusterIdentifier=self.cluster_identifier, + SnapshotIdentifier=self.snapshot_identifier, + ) + + if self.wait_for_completion: + while self.get_status() != "cluster_snapshot_not_found": + time.sleep(self.poll_interval) + + def get_status(self) -> str: + return self.redshift_hook.get_cluster_snapshot_status( + snapshot_identifier=self.snapshot_identifier, + cluster_identifier=self.cluster_identifier, + ) + + class RedshiftResumeClusterOperator(BaseOperator): """ Resume a paused AWS Redshift Cluster diff --git a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst index 842944f6e1b11..6dfe6989d415e 100644 --- a/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst +++ b/docs/apache-airflow-providers-amazon/operators/redshift_cluster.rst @@ -88,6 +88,20 @@ To create Amazon Redshift cluster snapshot you can use :start-after: [START howto_operator_redshift_create_cluster_snapshot] :end-before: [END howto_operator_redshift_create_cluster_snapshot] +.. _howto/operator:RedshiftDeleteClusterSnapshotOperator: + +Delete an Amazon Redshift cluster snapshot +========================================== + +To delete Amazon Redshift cluster snapshot you can use +:class:`RedshiftDeleteClusterSnapshotOperator ` + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_redshift_delete_cluster_snapshot] + :end-before: [END howto_operator_redshift_delete_cluster_snapshot] + .. _howto/operator:RedshiftDeleteClusterOperator: Delete an Amazon Redshift cluster diff --git a/tests/providers/amazon/aws/operators/test_redshift_cluster.py b/tests/providers/amazon/aws/operators/test_redshift_cluster.py index 4b594a0603d4b..acf863fe00cdf 100644 --- a/tests/providers/amazon/aws/operators/test_redshift_cluster.py +++ b/tests/providers/amazon/aws/operators/test_redshift_cluster.py @@ -24,6 +24,7 @@ RedshiftCreateClusterOperator, RedshiftCreateClusterSnapshotOperator, RedshiftDeleteClusterOperator, + RedshiftDeleteClusterSnapshotOperator, RedshiftPauseClusterOperator, RedshiftResumeClusterOperator, ) @@ -152,6 +153,49 @@ def test_create_cluster_snapshot_with_wait(self, mock_get_conn, mock_cluster_sta ) +class TestRedshiftDeleteClusterSnapshotOperator: + @mock.patch( + "airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status" + ) + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") + def test_delete_cluster_snapshot_wait(self, mock_get_conn, mock_get_cluster_snapshot_status): + mock_get_cluster_snapshot_status.return_value = 'cluster_snapshot_not_found' + delete_snapshot = RedshiftDeleteClusterSnapshotOperator( + task_id="test_snapshot", + cluster_identifier="test_cluster", + snapshot_identifier="test_snapshot", + ) + delete_snapshot.execute(None) + mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with( + SnapshotClusterIdentifier='test_cluster', + SnapshotIdentifier="test_snapshot", + ) + + mock_get_cluster_snapshot_status.assert_called_once_with( + cluster_identifier="test_cluster", + snapshot_identifier="test_snapshot", + ) + + @mock.patch( + "airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status" + ) + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn") + def test_delete_cluster_snapshot(self, mock_get_conn, mock_get_cluster_snapshot_status): + delete_snapshot = RedshiftDeleteClusterSnapshotOperator( + task_id="test_snapshot", + cluster_identifier="test_cluster", + snapshot_identifier="test_snapshot", + wait_for_completion=False, + ) + delete_snapshot.execute(None) + mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with( + SnapshotClusterIdentifier='test_cluster', + SnapshotIdentifier="test_snapshot", + ) + + mock_get_cluster_snapshot_status.assert_not_called() + + class TestResumeClusterOperator: def test_init(self): redshift_operator = RedshiftResumeClusterOperator( diff --git a/tests/system/providers/amazon/aws/example_redshift_cluster.py b/tests/system/providers/amazon/aws/example_redshift_cluster.py index 69f2a50b7cf9f..7c7b9070dea76 100644 --- a/tests/system/providers/amazon/aws/example_redshift_cluster.py +++ b/tests/system/providers/amazon/aws/example_redshift_cluster.py @@ -25,6 +25,7 @@ RedshiftCreateClusterOperator, RedshiftCreateClusterSnapshotOperator, RedshiftDeleteClusterOperator, + RedshiftDeleteClusterSnapshotOperator, RedshiftPauseClusterOperator, RedshiftResumeClusterOperator, ) @@ -95,10 +96,18 @@ cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER, snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER, retention_period=1, - poll_interval=5, + wait_for_completion=True, ) # [END howto_operator_redshift_create_cluster_snapshot] + # [START howto_operator_redshift_delete_cluster_snapshot] + task_delete_cluster_snapshot = RedshiftDeleteClusterSnapshotOperator( + task_id='delete_cluster_snapshot', + cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER, + snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER, + ) + # [END howto_operator_redshift_delete_cluster_snapshot] + # [START howto_operator_redshift_delete_cluster] task_delete_cluster = RedshiftDeleteClusterOperator( task_id="delete_cluster", @@ -114,6 +123,7 @@ task_wait_cluster_paused, task_resume_cluster, task_create_cluster_snapshot, + task_delete_cluster_snapshot, task_delete_cluster, )