Skip to content

Commit

Permalink
Added timeout to w.clusters.ensure_cluster_running() (#227)
Browse files Browse the repository at this point in the history
## Changes

This picks up where #118 left off.

<!-- Summary of your changes that are easy to understand -->

## Tests
<!-- 
How is this tested? Please see the checklist below and also describe any
other relevant tests
-->

- [x] `make test` run locally
- [x] `make fmt` applied
- [x] relevant integration tests applied

---------

Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
  • Loading branch information
judahrand and nfx authored Aug 2, 2023
1 parent 3213b7b commit c1c0886
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions databricks/sdk/mixins/compute.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import datetime
import logging
import re
import time
from dataclasses import dataclass
from typing import Optional

from databricks.sdk.errors import OperationFailed
from databricks.sdk.service import compute

_LOG = logging.getLogger('databricks.sdk')


@dataclass
class SemVer:
Expand Down Expand Up @@ -203,16 +209,28 @@ def select_node_type(self,
return nt.node_type_id
raise ValueError("cannot determine smallest node type")

def ensure_cluster_is_running(self, cluster_id: str):
def ensure_cluster_is_running(self, cluster_id: str) -> None:
"""Ensures that given cluster is running, regardless of the current state"""
state = compute.State
info = self.get(cluster_id)
if info.state == state.TERMINATED:
self.start(cluster_id).result()
elif info.state == state.TERMINATING:
self.wait_get_cluster_terminated(cluster_id)
self.start(cluster_id).result()
elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING):
self.wait_get_cluster_running(cluster_id)
elif info.state in (state.ERROR, state.UNKNOWN):
raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}')
timeout = datetime.timedelta(minutes=20)
deadline = time.time() + timeout.total_seconds()
while time.time() < deadline:
try:
state = compute.State
info = self.get(cluster_id)
if info.state == state.RUNNING:
return
elif info.state == state.TERMINATED:
self.start(cluster_id).result()
return
elif info.state == state.TERMINATING:
self.wait_get_cluster_terminated(cluster_id)
self.start(cluster_id).result()
return
elif info.state in (state.PENDING, state.RESIZING, state.RESTARTING):
self.wait_get_cluster_running(cluster_id)
return
elif info.state in (state.ERROR, state.UNKNOWN):
raise RuntimeError(f'Cluster {info.cluster_name} is {info.state}: {info.state_message}')
except OperationFailed as e:
_LOG.debug('Operation failed, retrying', exc_info=e)
raise TimeoutError(f'timed out after {timeout}')

0 comments on commit c1c0886

Please sign in to comment.