Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RedshiftDeleteClusterSnapshotOperator #25975

Merged
merged 2 commits into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions airflow/providers/amazon/aws/hooks/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,21 @@ def create_cluster_snapshot(
ManualSnapshotRetentionPeriod=retention_period,
)
return response['Snapshot'] if response['Snapshot'] else None

def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifier: str):
"""
Return Redshift cluster snapshot status. If cluster snapshot not found return ``None``

:param snapshot_identifier: A unique identifier for the snapshot that you are requesting
:param cluster_identifier: The unique identifier of the cluster the snapshot was created from
"""
try:
response = self.get_conn().describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier,
SnapshotIdentifier=snapshot_identifier,
)
snapshot = response.get("Snapshots")[0]
snapshot_status: str = snapshot.get("Status")
return snapshot_status
except self.get_conn().exceptions.ClusterSnapshotNotFoundFault:
return None
51 changes: 51 additions & 0 deletions airflow/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,57 @@ def execute(self, context: "Context") -> Any:
)


class RedshiftDeleteClusterSnapshotOperator(BaseOperator):
"""
Deletes the specified manual snapshot

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:RedshiftDeleteClusterSnapshotOperator`

:param snapshot_identifier: A unique identifier for the snapshot that you are requesting
:param cluster_identifier: The unique identifier of the cluster the snapshot was created from
:param wait_for_completion: Whether wait for cluster deletion or not
The default value is ``True``
:param aws_conn_id: The Airflow connection used for AWS credentials.
The default connection id is ``aws_default``
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check snapshot state
"""

def __init__(
self,
*,
snapshot_identifier: str,
cluster_identifier: str,
wait_for_completion: bool = True,
aws_conn_id: str = "aws_default",
poll_interval: int = 10,
**kwargs,
):
super().__init__(**kwargs)
self.snapshot_identifier = snapshot_identifier
self.cluster_identifier = cluster_identifier
self.wait_for_completion = wait_for_completion
self.poll_interval = poll_interval
self.redshift_hook = RedshiftHook(aws_conn_id=aws_conn_id)

def execute(self, context: "Context") -> Any:
self.redshift_hook.get_conn().delete_cluster_snapshot(
SnapshotClusterIdentifier=self.cluster_identifier,
SnapshotIdentifier=self.snapshot_identifier,
)

if self.wait_for_completion:
while self.get_status() is not None:
time.sleep(self.poll_interval)

def get_status(self) -> str:
return self.redshift_hook.get_cluster_snapshot_status(
snapshot_identifier=self.snapshot_identifier,
cluster_identifier=self.cluster_identifier,
)


class RedshiftResumeClusterOperator(BaseOperator):
"""
Resume a paused AWS Redshift Cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ To create Amazon Redshift cluster snapshot you can use
:start-after: [START howto_operator_redshift_create_cluster_snapshot]
:end-before: [END howto_operator_redshift_create_cluster_snapshot]

.. _howto/operator:RedshiftDeleteClusterSnapshotOperator:

Delete an Amazon Redshift cluster snapshot
==========================================

To delete Amazon Redshift cluster snapshot you can use
:class:`RedshiftDeleteClusterSnapshotOperator <airflow.providers.amazon.aws.operators.redshift_cluster>`

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_cluster.py
:language: python
:dedent: 4
:start-after: [START howto_operator_redshift_delete_cluster_snapshot]
:end-before: [END howto_operator_redshift_delete_cluster_snapshot]

.. _howto/operator:RedshiftDeleteClusterOperator:

Delete an Amazon Redshift cluster
Expand Down
44 changes: 44 additions & 0 deletions tests/providers/amazon/aws/operators/test_redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
RedshiftCreateClusterOperator,
RedshiftCreateClusterSnapshotOperator,
RedshiftDeleteClusterOperator,
RedshiftDeleteClusterSnapshotOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
Expand Down Expand Up @@ -152,6 +153,49 @@ def test_create_cluster_snapshot_with_wait(self, mock_get_conn, mock_cluster_sta
)


class TestRedshiftDeleteClusterSnapshotOperator:
@mock.patch(
"airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status"
)
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
def test_delete_cluster_snapshot_wait(self, mock_get_conn, mock_get_cluster_snapshot_status):
mock_get_cluster_snapshot_status.return_value = None
delete_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id="test_snapshot",
cluster_identifier="test_cluster",
snapshot_identifier="test_snapshot",
)
delete_snapshot.execute(None)
mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with(
SnapshotClusterIdentifier='test_cluster',
SnapshotIdentifier="test_snapshot",
)

mock_get_cluster_snapshot_status.assert_called_once_with(
cluster_identifier="test_cluster",
snapshot_identifier="test_snapshot",
)

@mock.patch(
"airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_cluster_snapshot_status"
)
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
def test_delete_cluster_snapshot(self, mock_get_conn, mock_get_cluster_snapshot_status):
delete_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id="test_snapshot",
cluster_identifier="test_cluster",
snapshot_identifier="test_snapshot",
wait_for_completion=False,
)
delete_snapshot.execute(None)
mock_get_conn.return_value.delete_cluster_snapshot.assert_called_once_with(
SnapshotClusterIdentifier='test_cluster',
SnapshotIdentifier="test_snapshot",
)

mock_get_cluster_snapshot_status.assert_not_called()


class TestResumeClusterOperator:
def test_init(self):
redshift_operator = RedshiftResumeClusterOperator(
Expand Down
12 changes: 11 additions & 1 deletion tests/system/providers/amazon/aws/example_redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
RedshiftCreateClusterOperator,
RedshiftCreateClusterSnapshotOperator,
RedshiftDeleteClusterOperator,
RedshiftDeleteClusterSnapshotOperator,
RedshiftPauseClusterOperator,
RedshiftResumeClusterOperator,
)
Expand Down Expand Up @@ -95,10 +96,18 @@
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER,
retention_period=1,
poll_interval=5,
wait_for_completion=True,
)
# [END howto_operator_redshift_create_cluster_snapshot]

# [START howto_operator_redshift_delete_cluster_snapshot]
task_delete_cluster_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id='delete_cluster_snapshot',
cluster_identifier=REDSHIFT_CLUSTER_IDENTIFIER,
snapshot_identifier=REDSHIFT_CLUSTER_SNAPSHOT_IDENTIFIER,
)
# [END howto_operator_redshift_delete_cluster_snapshot]

# [START howto_operator_redshift_delete_cluster]
task_delete_cluster = RedshiftDeleteClusterOperator(
task_id="delete_cluster",
Expand All @@ -114,6 +123,7 @@
task_wait_cluster_paused,
task_resume_cluster,
task_create_cluster_snapshot,
task_delete_cluster_snapshot,
task_delete_cluster,
)

Expand Down