Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Aug 27, 2022
1 parent d0d3c59 commit 62174a1
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
13 changes: 8 additions & 5 deletions airflow/providers/amazon/aws/hooks/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ def create_cluster_snapshot(
return response['Snapshot'] if response['Snapshot'] else None

def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifier: str):
"""
Return Redshift cluster snapshot status. If cluster snapshot not found return ``None``
:param snapshot_identifier: A unique identifier for the snapshot that you are requesting
:param cluster_identifier: The unique identifier of the cluster the snapshot was created from
"""
try:
response = self.get_conn().describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier,
Expand All @@ -163,8 +169,5 @@ def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifi
snapshot = response.get("Snapshots")[0]
snapshot_status: str = snapshot.get("Status")
return snapshot_status
except ClientError as exception:
if exception.response.get("Error", {}).get("Code", "") == "ClusterSnapshotNotFound":
return "cluster_snapshot_not_found"
else:
raise exception
except self.get_conn().exceptions.ClusterSnapshotNotFoundFault:
return None
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/operators/redshift_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def execute(self, context: "Context") -> Any:
)

if self.wait_for_completion:
while self.get_status() != "cluster_snapshot_not_found":
while self.get_status() is not None:
time.sleep(self.poll_interval)

def get_status(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class TestRedshiftDeleteClusterSnapshotOperator:
)
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.get_conn")
def test_delete_cluster_snapshot_wait(self, mock_get_conn, mock_get_cluster_snapshot_status):
mock_get_cluster_snapshot_status.return_value = 'cluster_snapshot_not_found'
mock_get_cluster_snapshot_status.return_value = None
delete_snapshot = RedshiftDeleteClusterSnapshotOperator(
task_id="test_snapshot",
cluster_identifier="test_cluster",
Expand Down

0 comments on commit 62174a1

Please sign in to comment.