Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid unnecessary API call in QueryJob.result() when job is already finished #1900

Merged
merged 14 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,16 @@ def _to_query_job(
errors = query_response["errors"]
query_job._properties["status"]["errors"] = errors

# Transform job state so that QueryJob doesn't try to restart the query.
# Avoid an extra call to `getQueryResults` if the query has finished.
job_complete = query_response.get("jobComplete")
if job_complete:
query_job._properties["status"]["state"] = "DONE"
query_job._query_results = google.cloud.bigquery.query._QueryResults(
query_response
)
else:
query_job._properties["status"]["state"] = "PENDING"

# We want job.result() to refresh the job state, so the conversion is
# always "PENDING", even if the job is finished.
query_job._properties["status"]["state"] = "PENDING"

return query_job

Expand Down
172 changes: 111 additions & 61 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import concurrent.futures
import copy
import re
import time
import typing
from typing import Any, Dict, Iterable, List, Optional, Union

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
from google.api_core import retry as retries
import requests

Expand Down Expand Up @@ -1383,7 +1383,7 @@ def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
def _reload_query_results(
self, retry: "retries.Retry" = DEFAULT_RETRY, timeout: Optional[float] = None
):
"""Refresh the cached query results.
"""Refresh the cached query results unless already cached and complete.

Args:
retry (Optional[google.api_core.retry.Retry]):
Expand All @@ -1392,6 +1392,8 @@ def _reload_query_results(
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
"""
# Optimization: avoid a call to jobs.getQueryResults if it's already
# been fetched, e.g. from jobs.query first page of results.
if self._query_results and self._query_results.complete:
return

Expand Down Expand Up @@ -1430,40 +1432,6 @@ def _reload_query_results(
timeout=transport_timeout,
)

def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was overridden because we wanted result() from the superclass to call jobs.getQueryResults, not just jobs.get (i.e. job.reload() in Python). Now that we aren't using the superclass for result(), this method is no longer necessary.

"""Check if the query has finished running and raise if it's not.

If the query has finished, also reload the job itself.
"""
# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Use the query results reload exception, as it generally contains
# much more useful error information.
self.set_exception(exc)
finally:
return

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if not self._query_results.complete:
raise polling_future._OperationNotComplete()
else:
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
Copy link
Contributor Author

@tswast tswast Apr 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought: We probably should have been calling set_exception based on the job status. Need to look into this further.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. We are. 😅

self.set_exception(exception)

Which we call from _set_properties

self._set_future_result()

Which we call from reload

self._set_properties(api_response)


def result( # type: ignore # (incompatible with supertype)
self,
page_size: Optional[int] = None,
Expand Down Expand Up @@ -1528,6 +1496,10 @@ def result( # type: ignore # (incompatible with supertype)
If Non-``None`` and non-default ``job_retry`` is
provided and the job is not retryable.
"""
# Note: Since waiting for a query job to finish is more complex than
# refreshing the job state in a loop, we avoid calling the superclass
# in this method.

if self.dry_run:
return _EmptyRowIterator(
project=self.project,
Expand All @@ -1548,46 +1520,124 @@ def result( # type: ignore # (incompatible with supertype)
" provided to the query that created this job."
)

first = True
restart_query_job = False

def is_job_done():
nonlocal restart_query_job

def do_get_result():
nonlocal first
if restart_query_job:
restart_query_job = False

if first:
first = False
else:
# The original job has failed. Create a new one.
#
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.

# The orinal job is failed. Create a new one.
job = retry_do_query()

# If it's already failed, we might as well stop:
if job.done() and job.exception() is not None:
raise job.exception()

# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)

# This shouldn't be necessary, because once we have a good
# job, it should stay good,and we shouldn't have to retry.
# But let's be paranoid. :)
# It's possible the job fails again and we'll have to
# retry that too.
self._retry_do_query = retry_do_query
self._job_retry = job_retry

super(QueryJob, self).result(retry=retry, timeout=timeout)

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
if self._query_results is None or not self._query_results.complete:
self._reload_query_results(retry=retry, timeout=timeout)
# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, timeout=timeout):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
# Only try to restart the query job if the job failed for
# a retriable reason. For example, don't restart the query
# if the call to reload the job metadata within self.done()
# timed out.
#
# The `restart_query_job` must only be called after a
# successful call to the `jobs.get` REST API and we
# determine that the job has failed.
#
# The `jobs.get` REST API
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
# is called via `self.done()` which calls
# `self.reload()`.
#
# To determine if the job failed, the `self.exception()`
# is set from `self.reload()` via
# `self._set_properties()`, which translates the
# `Job.status.errorResult` field
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
# into an exception that can be processed by the
# `job_retry` predicate.
restart_query_job = True
raise job_failed_exception
else:
# Make sure that the _query_results are cached so we
# can return a complete RowIterator.
#
# Note: As an optimization, _reload_query_results
# doesn't make any API calls if the query results are
# already cached and have jobComplete=True in the
# response from the REST API. This ensures we aren't
# making any extra API calls if the previous loop
# iteration fetched the finished job.
self._reload_query_results(retry=retry, timeout=timeout)
return True

# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, timeout=timeout)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh oh, if jobs.getQueryResults fails because the job failed it can throw an exception but restart_query_job will still be False.

But we don't want restart_query_job = True because sometimes this can raise an ambiguous exception such as quota exceeded, where we don't know if it's the job quota and it's a failed job or at a higher level (Google Frontend - GFE) where the job might actually still be running and/or succeeded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the worst way to fail, but it'd be nice to do the jobs.get call above in case of an exception to get a chance at retrying this job if the job failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #1903 to track improvements to ambiguous errors. 12fa9fb fixes an issue where we weren't actually retrying after an ambiguous failure even though we thought we were.


# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
# it's not already DONE.
return False

if retry_do_query is not None and job_retry is not None:
do_get_result = job_retry(do_get_result)

do_get_result()
is_job_done = job_retry(is_job_done)

# timeout can be a number of seconds, `None`, or a
# `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
# sentinel object indicating a default timeout if we choose to add
# one some day. This value can come from our PollingFuture
# superclass and was introduced in
# https://github.com/googleapis/python-api-core/pull/462.
if isinstance(timeout, (float, int)):
remaining_timeout = timeout
else:
# Note: we may need to handle _DEFAULT_VALUE as a separate
# case someday, but even then the best we can do for queries
# is 72+ hours for hyperpareter tuning jobs:
chalmerlowe marked this conversation as resolved.
Show resolved Hide resolved
# https://cloud.google.com/bigquery/quotas#query_jobs
#
# The timeout for a multi-statement query is 24+ hours. See:
# https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
remaining_timeout = None

if remaining_timeout is None:
# Since is_job_done() calls jobs.getQueryResults, which is a
# long-running API, don't delay the next request at all.
while not is_job_done():
pass
else:
# Use a monotonic clock since we don't actually care about
# daylight savings or similar, just the elapsed time.
previous_time = time.monotonic()

while not is_job_done():
current_time = time.monotonic()
elapsed_time = current_time - previous_time
remaining_timeout = remaining_timeout - elapsed_time
previous_time = current_time

if remaining_timeout < 0:
raise concurrent.futures.TimeoutError()

except exceptions.GoogleAPICallError as exc:
exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
Expand Down
52 changes: 47 additions & 5 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,25 @@

_DEFAULT_RETRY_DEADLINE = 10.0 * 60.0 # 10 minutes

# Allow for a few retries after the API request times out. This relevant for
# rateLimitExceeded errors, which can be raised either by the Google load
# balancer or the BigQuery job server.
_DEFAULT_JOB_DEADLINE = 3.0 * _DEFAULT_RETRY_DEADLINE
# Ambiguous errors (e.g. internalError, backendError, rateLimitExceeded) retry
# until the full `_DEFAULT_RETRY_DEADLINE`. This is because the
# `jobs.getQueryResults` REST API translates a job failure into an HTTP error.
#
# TODO(https://github.com/googleapis/python-bigquery/issues/1903): Investigate
# if we can fail early for ambiguous errors in `QueryJob.result()`'s call to
# the `jobs.getQueryResult` API.
#
# We need `_DEFAULT_JOB_DEADLINE` to be some multiple of
# `_DEFAULT_RETRY_DEADLINE` to allow for a few retries after the retry
# timeout is reached.
#
# Note: This multiple should actually be a multiple of
# (2 * _DEFAULT_RETRY_DEADLINE). After an ambiguous exception, the first
# call from `job_retry()` refreshes the job state without actually restarting
# the query. The second `job_retry()` actually restarts the query. For a more
# detailed explanation, see the comments where we set `restart_query_job = True`
# in `QueryJob.result()`'s inner `is_job_done()` function.
_DEFAULT_JOB_DEADLINE = 2.0 * (2.0 * _DEFAULT_RETRY_DEADLINE)


def _should_retry(exc):
Expand All @@ -66,17 +81,44 @@ def _should_retry(exc):
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""

# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We
# briefly had a default timeout, but even setting it at more than twice the
# theoretical server-side default timeout of 2 minutes was not enough for
# complex queries. See:
# https://github.com/googleapis/python-bigquery/issues/970#issuecomment-921934647
DEFAULT_TIMEOUT = None
"""The default API timeout.

This is the time to wait per request. To adjust the total wait time, set a
deadline on the retry object.
"""

job_retry_reasons = "rateLimitExceeded", "backendError", "jobRateLimitExceeded"
job_retry_reasons = (
"rateLimitExceeded",
"backendError",
"internalError",
"jobRateLimitExceeded",
)


def _job_should_retry(exc):
# Sometimes we have ambiguous errors, such as 'backendError' which could
# be due to an API problem or a job problem. For these, make sure we retry
# our is_job_done() function.
#
# Note: This won't restart the job unless we know for sure it's because of
# the job status and set restart_query_job = True in that loop. This means
# that we might end up calling this predicate twice for the same job
# but from different paths: (1) from jobs.getQueryResults RetryError and
# (2) from translating the job error from the body of a jobs.get response.
#
# Note: If we start retrying job types other than queries where we don't
# call the problematic getQueryResults API to check the status, we need
# to provide a different predicate, as there shouldn't be ambiguous
# errors in those cases.
if isinstance(exc, exceptions.RetryError):
exc = exc.cause

if not hasattr(exc, "errors") or len(exc.errors) == 0:
return False

Expand Down
Loading