Skip to content

Commit

Permalink
Add RedshiftDeleteClusterSnapshotOperator (#25975)
Browse files Browse the repository at this point in the history
* Add RedshiftDeleteClusterSnapshotOperator

* Review feedback
  • Loading branch information
pankajastro authored Aug 27, 2022
1 parent 695e1a5 commit c9c89e5
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 1 deletion.
18 changes: 18 additions & 0 deletions airflow/providers/amazon/aws/hooks/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,21 @@ def create_cluster_snapshot(
ManualSnapshotRetentionPeriod=retention_period,
)
return response['Snapshot'] if response['Snapshot'] else None

def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifier: str):
"""
Return Redshift cluster snapshot status. If cluster snapshot not found return ``None``
:param snapshot_identifier: A unique identifier for the snapshot that you are requesting
:param cluster_identifier: The unique identifier of the cluster the snapshot was created from
"""
try:
response = self.get_conn().describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier,
SnapshotIdentifier=snapshot_identifier,
)
snapshot = response.get("Snapshots")[0]
snapshot_status: str = snapshot.get("Status")
return snapshot_status
except self.get_conn().exceptions.ClusterSnapshotNotFoundFault:
return None
51 changes: 51 additions & 0 deletions airflow/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,57 @@ def execute(self, context: "Context") -> Any:
)


class RedshiftDeleteClusterSnapshotOperator(BaseOperator):
"""
Deletes the specified manual snapshot
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:RedshiftDeleteClusterSnapshotOperator`
:param snapshot_identifier: A unique identifier for the snapshot that you are requesting
:param cluster_identifier: The unique identifier of the cluster the snapshot was created from
:param wait_for_completion: Whether wait for cluster deletion or not
The default value is ``True``
:param aws_conn_id: The Airflow connection used for AWS credentials.
The default connection id is ``aws_default``
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check snapshot state
"""

def __init__(
self,
*,
snapshot_identifier: str,
cluster_identifier: str,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
poll_interval: int = 10,
**kwargs,
):
super().__init__(**kwargs)
self.snapshot_identifier = snapshot_identifier
self.cluster_identifier = cluster_identifier
self.wait_for_completion = wait_for_completion
self.poll_interval = poll_interval
self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)

def execute(self, context: "Context") -> Any:
self.redshift_hook.get_conn().delete_cluster_snapshot(
SnapshotClusterIdentifier=self.cluster_identifier,
SnapshotIdentifier=self.snapshot_identifier,
)

if self.wait_for_completion:
while self.get_status() is not None:
time.sleep(self.poll_interval)

def get_status(self) -> str:
return self.redshift_hook.get_cluster_snapshot_status(
snapshot_identifier=self.snapshot_identifier,
cluster_identifier=self.cluster_identifier,
)


class RedshiftResumeClusterOperator(BaseOperator):
"""
Resume a paused AWS Redshift Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ To create Amazon Redshift cluster snapshot you can use
:start-after: [START howto_operator_redshift_create_cluster_snapshot]
:end-before: [END howto_operator_redshift_create_cluster_snapshot]

.. _howto/operator:RedshiftDeleteClusterSnapshotOperator:

Delete an Amazon Redshift cluster snapshot
==========================================

To delete Amazon Redshift cluster snapshot you can use
:class:`RedshiftDeleteClusterSnapshotOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_delete_cluster_snapshot]
:end-before: [END howto_operator_redshift_delete_cluster_snapshot]

.. _howto/operator:RedshiftDeleteClusterOperator:

Delete an Amazon Redshift cluster
Expand Down
44 changes: 44 additions & 0 deletions tests/providers/amazon/aws/operators/test_redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
RedshiftCreateClusterOperator,
RedshiftCreateClusterSnapshotOperator,
RedshiftDeleteClusterOperator,
RedshiftDeleteClusterSnapshotOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
Expand Down Expand Up @@ -152,6 +153,49 @@ def test_create_cluster_snapshot_with_wait(self, mock_get_conn, mock_cluster_sta
)


class TestRedshiftDeleteClusterSnapshotOperator:
@mock.patch(
"airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status"
)
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
def test_delete_cluster_snapshot_wait(self, mock_get_conn, mock_get_cluster_snapshot_status):
mock_get_cluster_snapshot_status.return_value = None
delete_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id="test_snapshot",
cluster_identifier="test_cluster",
snapshot_identifier="test_snapshot",
)
delete_snapshot.execute(None)
mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with(
SnapshotClusterIdentifier='test_cluster',
SnapshotIdentifier="test_snapshot",
)

mock_get_cluster_snapshot_status.assert_called_once_with(
cluster_identifier="test_cluster",
snapshot_identifier="test_snapshot",
)

@mock.patch(
"airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status"
)
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
def test_delete_cluster_snapshot(self, mock_get_conn, mock_get_cluster_snapshot_status):
delete_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id="test_snapshot",
cluster_identifier="test_cluster",
snapshot_identifier="test_snapshot",
wait_for_completion=False,
)
delete_snapshot.execute(None)
mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with(
SnapshotClusterIdentifier='test_cluster',
SnapshotIdentifier="test_snapshot",
)

mock_get_cluster_snapshot_status.assert_not_called()


class TestResumeClusterOperator:
def test_init(self):
redshift_operator = RedshiftResumeClusterOperator(
Expand Down
12 changes: 11 additions & 1 deletion tests/system/providers/amazon/aws/example_redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
RedshiftCreateClusterOperator,
RedshiftCreateClusterSnapshotOperator,
RedshiftDeleteClusterOperator,
RedshiftDeleteClusterSnapshotOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
Expand Down Expand Up @@ -95,10 +96,18 @@
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER,
retention_period=1,
poll_interval=5,
wait_for_completion=True,
)
# [END howto_operator_redshift_create_cluster_snapshot]

# [START howto_operator_redshift_delete_cluster_snapshot]
task_delete_cluster_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id='delete_cluster_snapshot',
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER,
)
# [END howto_operator_redshift_delete_cluster_snapshot]

# [START howto_operator_redshift_delete_cluster]
task_delete_cluster = RedshiftDeleteClusterOperator(
task_id="delete_cluster",
Expand All @@ -114,6 +123,7 @@
task_wait_cluster_paused,
task_resume_cluster,
task_create_cluster_snapshot,
task_delete_cluster_snapshot,
task_delete_cluster,
)

Expand Down

0 comments on commit c9c89e5

Please sign in to comment.