Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not terminate listing for token-based pagination resources on empty response #530

Merged
merged 6 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions .codegen/service.py.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -270,29 +270,32 @@ class {{.Name}}API:{{if .Description}}
{{- end}}
while True:
json = {{template "method-do" .}}
if '{{.Pagination.Results.Name}}' not in json or not json['{{.Pagination.Results.Name}}']:
if '{{.Pagination.Results.Name}}' in json:
for v in json['{{.Pagination.Results.Name}}']:
{{if .NeedsOffsetDedupe -}}
i = v['{{.IdentifierField.Name}}']
if i in seen:
continue
seen.add(i)
{{end -}}
yield {{.Pagination.Entity.PascalName}}.from_dict(v)
{{ if .Pagination.Token -}}
if '{{.Pagination.Token.Bind.Name}}' not in json or not json['{{.Pagination.Token.Bind.Name}}']:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is slightly reordered but designed to match the Go SDK pagination logic exactly.

return
for v in json['{{.Pagination.Results.Name}}']:
{{if .NeedsOffsetDedupe -}}
i = v['{{.IdentifierField.Name}}']
if i in seen:
continue
seen.add(i)
{{end -}}
yield {{.Pagination.Entity.PascalName}}.from_dict(v)
{{if eq .Path "/api/2.0/clusters/events" -}}
{{if eq "GET" .Verb}}query{{else}}body{{end}}['{{.Pagination.Token.PollField.Name}}'] = json['{{.Pagination.Token.Bind.Name}}']
{{- else if eq .Path "/api/2.0/clusters/events" -}}
if 'next_page' not in json or not json['next_page']:
return
body = json['next_page']
{{- else if .Pagination.Token -}}
if '{{.Pagination.Token.Bind.Name}}' not in json or not json['{{.Pagination.Token.Bind.Name}}']:
{{- else -}}
if '{{.Pagination.Results.Name}}' not in json or not json['{{.Pagination.Results.Name}}']:
return
{{if eq "GET" .Verb}}query{{else}}body{{end}}['{{.Pagination.Token.PollField.Name}}'] = json['{{.Pagination.Token.Bind.Name}}']
{{- else if eq .Pagination.Increment 1 -}}
{{ if eq .Pagination.Increment 1 -}}
query['{{.Pagination.Offset.Name}}'] += 1
{{- else -}}
query['{{.Pagination.Offset.Name}}'] += len(json['{{.Pagination.Results.Name}}'])
{{- end}}
{{- end}}
{{else -}}
json = {{template "method-do" .}}
parsed = {{.Response.PascalName}}.from_dict(json).{{template "safe-snake-name" .Pagination.Results}}
Expand Down
2 changes: 1 addition & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ examples/queries/create_queries.py linguist-generated=true
examples/queries/get_queries.py linguist-generated=true
examples/queries/update_queries.py linguist-generated=true
examples/query_history/list_sql_query_history.py linguist-generated=true
examples/r/wait_catalog_workspace_bindings.py linguist-generated=true
examples/recipients/create_recipients.py linguist-generated=true
examples/recipients/get_recipients.py linguist-generated=true
examples/recipients/list_recipients.py linguist-generated=true
Expand Down Expand Up @@ -285,7 +286,6 @@ examples/workspace/list_workspace_integration.py linguist-generated=true
examples/workspace_assignment/list_workspace_assignment_on_aws.py linguist-generated=true
examples/workspace_assignment/update_workspace_assignment_on_aws.py linguist-generated=true
examples/workspace_bindings/get_catalog_workspace_bindings.py linguist-generated=true
examples/workspace_bindings/update_catalog_workspace_bindings.py linguist-generated=true
examples/workspace_conf/get_status_repos.py linguist-generated=true
examples/workspaces/create_workspaces.py linguist-generated=true
examples/workspaces/get_workspaces.py linguist-generated=true
Expand Down
50 changes: 50 additions & 0 deletions databricks/sdk/clock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import abc
import time


class Clock(metaclass=abc.ABCMeta):
@abc.abstractmethod
def time(self) -> float:
"""
Return the current time in seconds since the Epoch.
Fractions of a second may be present if the system clock provides them.

:return: The current time in seconds since the Epoch.
"""
pass

@abc.abstractmethod
def sleep(self, seconds: float) -> None:
"""
Return the current time in seconds since the Epoch.
Fractions of a second may be present if the system clock provides them.

:param seconds: The duration to sleep in seconds.
:return:
"""
pass


class RealClock(Clock):
"""
A real clock that uses the ``time`` module to get the current time and sleep.
"""

def time(self) -> float:
"""
Return the current time in seconds since the Epoch.
Fractions of a second may be present if the system clock provides them.

:return: The current time in seconds since the Epoch.
"""
return time.time()

def sleep(self, seconds: float) -> None:
"""
Return the current time in seconds since the Epoch.
Fractions of a second may be present if the system clock provides them.

:param seconds: The duration to sleep in seconds.
:return:
"""
time.sleep(seconds)
10 changes: 8 additions & 2 deletions databricks/sdk/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .credentials_provider import *
from .errors import DatabricksError, error_mapper
from .retries import retried
from .clock import Clock, RealClock

__all__ = ['Config', 'DatabricksError']

Expand All @@ -22,12 +23,16 @@ class ApiClient:
_cfg: Config
_RETRY_AFTER_DEFAULT: int = 1

def __init__(self, cfg: Config = None):
def __init__(self, cfg: Config = None, clock: Clock=None):

if cfg is None:
cfg = Config()

if clock is None:
clock = RealClock()

self._cfg = cfg
self._clock = clock
# See https://github.com/databricks/databricks-sdk-go/blob/main/client/client.go#L34-L35
self._debug_truncate_bytes = cfg.debug_truncate_bytes if cfg.debug_truncate_bytes else 96
self._retry_timeout_seconds = cfg.retry_timeout_seconds if cfg.retry_timeout_seconds else 300
Expand Down Expand Up @@ -123,7 +128,8 @@ def do(self,
headers = {}
headers['User-Agent'] = self._user_agent_base
retryable = retried(timeout=timedelta(seconds=self._retry_timeout_seconds),
is_retryable=self._is_retryable)
is_retryable=self._is_retryable,
clock=self._clock)
return retryable(self._perform)(method,
path,
query=query,
Expand Down
14 changes: 9 additions & 5 deletions databricks/sdk/retries.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,34 @@
import functools
import logging
import time
from datetime import timedelta
from random import random
from typing import Callable, Optional, Sequence, Type

from .clock import Clock, RealClock

logger = logging.getLogger(__name__)


def retried(*,
on: Sequence[Type[BaseException]] = None,
is_retryable: Callable[[BaseException], Optional[str]] = None,
timeout=timedelta(minutes=20)):
timeout=timedelta(minutes=20),
clock: Clock=None):
has_allowlist = on is not None
has_callback = is_retryable is not None
if not (has_allowlist or has_callback) or (has_allowlist and has_callback):
raise SyntaxError('either on=[Exception] or callback=lambda x: .. is required')
if clock is None:
clock = RealClock()

def decorator(func):

@functools.wraps(func)
def wrapper(*args, **kwargs):
deadline = time.time() + timeout.total_seconds()
deadline = clock.time() + timeout.total_seconds()
attempt = 1
last_err = None
while time.time() < deadline:
while clock.time() < deadline:
try:
return func(*args, **kwargs)
except Exception as err:
Expand All @@ -50,7 +54,7 @@ def wrapper(*args, **kwargs):
raise err

logger.debug(f'Retrying: {retry_reason} (sleeping ~{sleep}s)')
time.sleep(sleep + random())
clock.sleep(sleep + random())
attempt += 1
raise TimeoutError(f'Timed out after {timeout}') from last_err

Expand Down
56 changes: 24 additions & 32 deletions databricks/sdk/service/catalog.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 6 additions & 8 deletions databricks/sdk/service/compute.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading