From a1b943f11a3ef9545eba1cc5eda226690c67caf0 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Wed, 17 May 2023 16:08:23 +0200 Subject: [PATCH 1/5] Harden `w.clusters.ensure_cluster_running()` - Retry on `OperationFailed` errors, same as in Java SDK --- databricks/sdk/mixins/compute.py | 35 ++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/databricks/sdk/mixins/compute.py b/databricks/sdk/mixins/compute.py index c840eb63..94fb5091 100644 --- a/databricks/sdk/mixins/compute.py +++ b/databricks/sdk/mixins/compute.py @@ -1,9 +1,15 @@ +import datetime +import logging import re +import time from dataclasses import dataclass from typing import Optional +from databricks.sdk.errors import OperationFailed from databricks.sdk.service import compute +_LOG = logging.getLogger('databricks.sdk') + @dataclass class SemVer: @@ -205,14 +211,21 @@ def select_node_type(self, def ensure_cluster_is_running(self, cluster_id: str): """Ensures that given cluster is running, regardless of the current state""" - state = compute.State - info = self.get(cluster_id) - if info.state == state.TERMINATED: - self.start(cluster_id).result() - elif info.state == state.TERMINATING: - self.wait_get_cluster_terminated(cluster_id) - self.start(cluster_id).result() - elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING): - self.wait_get_cluster_running(cluster_id) - elif info.state in (state.ERROR, state.UNKNOWN): - raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}') + timeout = datetime.timedelta(minutes=20) + deadline = time.time() + timeout.total_seconds() + while time.time() < deadline: + try: + state = compute.State + info = self.get(cluster_id) + if info.state == state.TERMINATED: + self.start(cluster_id).result() + elif info.state == state.TERMINATING: + self.wait_get_cluster_terminated(cluster_id) + self.start(cluster_id).result() + elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING): + self.wait_get_cluster_running(cluster_id) + elif info.state in (state.ERROR, state.UNKNOWN): + raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}') + except OperationFailed as e: + _LOG.debug(f'Operation failed, retrying', exc_info=e) + raise TimeoutError(f'timed out after {timeout}') From 2fe12932f697f0e68ee954c5e45280098d137f27 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Mon, 10 Jul 2023 19:50:47 +0100 Subject: [PATCH 2/5] Fix `ensure_cluster_is_running` --- databricks/sdk/mixins/compute.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/databricks/sdk/mixins/compute.py b/databricks/sdk/mixins/compute.py index 94fb5091..b3b3367d 100644 --- a/databricks/sdk/mixins/compute.py +++ b/databricks/sdk/mixins/compute.py @@ -213,19 +213,27 @@ def ensure_cluster_is_running(self, cluster_id: str): """Ensures that given cluster is running, regardless of the current state""" timeout = datetime.timedelta(minutes=20) deadline = time.time() + timeout.total_seconds() - while time.time() < deadline: + cluster_running = False + while time.time() < deadline and not cluster_running: try: state = compute.State info = self.get(cluster_id) - if info.state == state.TERMINATED: + if info.state == state.RUNNING: + cluster_running = True + elif info.state == state.TERMINATED: self.start(cluster_id).result() + cluster_running = True elif info.state == state.TERMINATING: self.wait_get_cluster_terminated(cluster_id) self.start(cluster_id).result() + cluster_running = True elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING): self.wait_get_cluster_running(cluster_id) + cluster_running = True elif info.state in (state.ERROR, state.UNKNOWN): raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}') except OperationFailed as e: - _LOG.debug(f'Operation failed, retrying', exc_info=e) - raise TimeoutError(f'timed out after {timeout}') + _LOG.debug('Operation failed, retrying', exc_info=e) + + if not cluster_running: + raise TimeoutError(f'timed out after {timeout}') From b4d298a9969e22bc2a00bcff434e8e6c66af3f68 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 11 Jul 2023 08:31:04 +0100 Subject: [PATCH 3/5] Remove `cluster_running` flag --- databricks/sdk/mixins/compute.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/databricks/sdk/mixins/compute.py b/databricks/sdk/mixins/compute.py index b3b3367d..61733a6f 100644 --- a/databricks/sdk/mixins/compute.py +++ b/databricks/sdk/mixins/compute.py @@ -209,31 +209,28 @@ def select_node_type(self, return nt.node_type_id raise ValueError("cannot determine smallest node type") - def ensure_cluster_is_running(self, cluster_id: str): + def ensure_cluster_is_running(self, cluster_id: str) -> None: """Ensures that given cluster is running, regardless of the current state""" timeout = datetime.timedelta(minutes=20) deadline = time.time() + timeout.total_seconds() - cluster_running = False - while time.time() < deadline and not cluster_running: + while time.time() < deadline: try: state = compute.State info = self.get(cluster_id) if info.state == state.RUNNING: - cluster_running = True + return elif info.state == state.TERMINATED: self.start(cluster_id).result() - cluster_running = True + return elif info.state == state.TERMINATING: self.wait_get_cluster_terminated(cluster_id) self.start(cluster_id).result() - cluster_running = True + return elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING): self.wait_get_cluster_running(cluster_id) - cluster_running = True + return elif info.state in (state.ERROR, state.UNKNOWN): raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}') except OperationFailed as e: _LOG.debug('Operation failed, retrying', exc_info=e) - - if not cluster_running: - raise TimeoutError(f'timed out after {timeout}') + raise TimeoutError(f'timed out after {timeout}') From 3f04d7c1b8db1e3e282b12285bc364b64dfab082 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Tue, 11 Jul 2023 08:32:33 +0100 Subject: [PATCH 4/5] Return `ClusterDetails` --- databricks/sdk/mixins/compute.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/databricks/sdk/mixins/compute.py b/databricks/sdk/mixins/compute.py index 61733a6f..46eae4f4 100644 --- a/databricks/sdk/mixins/compute.py +++ b/databricks/sdk/mixins/compute.py @@ -209,7 +209,7 @@ def select_node_type(self, return nt.node_type_id raise ValueError("cannot determine smallest node type") - def ensure_cluster_is_running(self, cluster_id: str) -> None: + def ensure_cluster_is_running(self, cluster_id: str) -> compute.ClusterDetails: """Ensures that given cluster is running, regardless of the current state""" timeout = datetime.timedelta(minutes=20) deadline = time.time() + timeout.total_seconds() @@ -218,17 +218,14 @@ def ensure_cluster_is_running(self, cluster_id: str) -> None: state = compute.State info = self.get(cluster_id) if info.state == state.RUNNING: - return + return info elif info.state == state.TERMINATED: - self.start(cluster_id).result() - return + return self.start(cluster_id).result() elif info.state == state.TERMINATING: self.wait_get_cluster_terminated(cluster_id) - self.start(cluster_id).result() - return + return self.start(cluster_id).result() elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING): - self.wait_get_cluster_running(cluster_id) - return + return self.wait_get_cluster_running(cluster_id) elif info.state in (state.ERROR, state.UNKNOWN): raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}') except OperationFailed as e: From 743252cb4c44c7ae39b106491e379d00f8d9b9db Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 27 Jul 2023 09:05:56 +0100 Subject: [PATCH 5/5] Don't return `ClusterDetails` This was requested in order to match up with the SDKs in other languages. This reverts commit 3f04d7c1b8db1e3e282b12285bc364b64dfab082. --- databricks/sdk/mixins/compute.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/databricks/sdk/mixins/compute.py b/databricks/sdk/mixins/compute.py index 46eae4f4..61733a6f 100644 --- a/databricks/sdk/mixins/compute.py +++ b/databricks/sdk/mixins/compute.py @@ -209,7 +209,7 @@ def select_node_type(self, return nt.node_type_id raise ValueError("cannot determine smallest node type") - def ensure_cluster_is_running(self, cluster_id: str) -> compute.ClusterDetails: + def ensure_cluster_is_running(self, cluster_id: str) -> None: """Ensures that given cluster is running, regardless of the current state""" timeout = datetime.timedelta(minutes=20) deadline = time.time() + timeout.total_seconds() @@ -218,14 +218,17 @@ def ensure_cluster_is_running(self, cluster_id: str) -> compute.ClusterDetails: state = compute.State info = self.get(cluster_id) if info.state == state.RUNNING: - return info + return elif info.state == state.TERMINATED: - return self.start(cluster_id).result() + self.start(cluster_id).result() + return elif info.state == state.TERMINATING: self.wait_get_cluster_terminated(cluster_id) - return self.start(cluster_id).result() + self.start(cluster_id).result() + return elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING): - return self.wait_get_cluster_running(cluster_id) + self.wait_get_cluster_running(cluster_id) + return elif info.state in (state.ERROR, state.UNKNOWN): raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}') except OperationFailed as e: