Skip to content

Commit

Permalink
Add Future interface to BigQuery jobs (googleapis#3626)
Browse files Browse the repository at this point in the history
* Add future interface to bigquery Jobs.
* Make QueryJob return QueryResults from result()
* Deprecate QueryJob.results()
  • Loading branch information
Jon Wayne Parrott authored and landrito committed Aug 22, 2017
1 parent 30f98f7 commit a76bb1d
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 22 deletions.
201 changes: 181 additions & 20 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@

"""Define API Jobs."""

import collections
import threading
import warnings

import six
from six.moves import http_client

from google.cloud import exceptions
from google.cloud.exceptions import NotFound
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -27,6 +33,60 @@
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _TypedProperty
import google.cloud.future.base

_DONE_STATE = 'DONE'
_STOPPED_REASON = 'stopped'

_ERROR_REASON_TO_EXCEPTION = {
'accessDenied': http_client.FORBIDDEN,
'backendError': http_client.INTERNAL_SERVER_ERROR,
'billingNotEnabled': http_client.FORBIDDEN,
'billingTierLimitExceeded': http_client.BAD_REQUEST,
'blocked': http_client.FORBIDDEN,
'duplicate': http_client.CONFLICT,
'internalError': http_client.INTERNAL_SERVER_ERROR,
'invalid': http_client.BAD_REQUEST,
'invalidQuery': http_client.BAD_REQUEST,
'notFound': http_client.NOT_FOUND,
'notImplemented': http_client.NOT_IMPLEMENTED,
'quotaExceeded': http_client.FORBIDDEN,
'rateLimitExceeded': http_client.FORBIDDEN,
'resourceInUse': http_client.BAD_REQUEST,
'resourcesExceeded': http_client.BAD_REQUEST,
'responseTooLarge': http_client.FORBIDDEN,
'stopped': http_client.OK,
'tableUnavailable': http_client.BAD_REQUEST,
}

_FakeResponse = collections.namedtuple('_FakeResponse', ['status'])


def _error_result_to_exception(error_result):
"""Maps BigQuery error reasons to an exception.
The reasons and their matching HTTP status codes are documented on
the `troubleshooting errors`_ page.
.. _troubleshooting errors: https://cloud.google.com/bigquery\
/troubleshooting-errors
:type error_result: Mapping[str, str]
:param error_result: The error result from BigQuery.
:rtype google.cloud.exceptions.GoogleCloudError:
:returns: The mapped exception.
"""
reason = error_result.get('reason')
status_code = _ERROR_REASON_TO_EXCEPTION.get(
reason, http_client.INTERNAL_SERVER_ERROR)
# make_exception expects an httplib2 response object.
fake_response = _FakeResponse(status=status_code)
return exceptions.make_exception(
fake_response,
error_result.get('message', ''),
error_info=error_result,
use_json=False)


class Compression(_EnumProperty):
Expand Down Expand Up @@ -82,16 +142,23 @@ class WriteDisposition(_EnumProperty):
ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY)


class _BaseJob(object):
"""Base class for jobs.
class _AsyncJob(google.cloud.future.base.PollingFuture):
"""Base class for asynchronous jobs.
:type name: str
:param name: the name of the job
:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, client):
def __init__(self, name, client):
super(_AsyncJob, self).__init__()
self.name = name
self._client = client
self._properties = {}
self._result_set = False
self._completion_lock = threading.Lock()

@property
def project(self):
Expand All @@ -117,21 +184,6 @@ def _require_client(self, client):
client = self._client
return client


class _AsyncJob(_BaseJob):
"""Base class for asynchronous jobs.
:type name: str
:param name: the name of the job
:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, name, client):
super(_AsyncJob, self).__init__(client)
self.name = name

@property
def job_type(self):
"""Type of job
Expand Down Expand Up @@ -273,6 +325,9 @@ def _set_properties(self, api_response):
self._properties.clear()
self._properties.update(cleaned)

# For Future interface
self._set_future_result()

@classmethod
def _get_resource_config(cls, resource):
"""Helper for :meth:`from_api_repr`
Expand Down Expand Up @@ -345,7 +400,7 @@ def exists(self, client=None):
return True

def reload(self, client=None):
"""API call: refresh job properties via a GET request
"""API call: refresh job properties via a GET request.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
Expand All @@ -371,12 +426,85 @@ def cancel(self, client=None):
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
:rtype: bool
:returns: Boolean indicating that the cancel request was sent.
"""
client = self._require_client(client)

api_response = client._connection.api_request(
method='POST', path='%s/cancel' % (self.path,))
self._set_properties(api_response['job'])
# The Future interface requires that we return True if the *attempt*
# to cancel was successful.
return True

# The following methods implement the PollingFuture interface. Note that
# the methods above are from the pre-Future interface and are left for
# compatibility. The only "overloaded" method is :meth:`cancel`, which
# satisfies both interfaces.

def _set_future_result(self):
"""Set the result or exception from the job if it is complete."""
# This must be done in a lock to prevent the polling thread
# and main thread from both executing the completion logic
# at the same time.
with self._completion_lock:
# If the operation isn't complete or if the result has already been
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != _DONE_STATE or self._result_set:
return

if self.error_result is not None:
exception = _error_result_to_exception(self.error_result)
self.set_exception(exception)
else:
self.set_result(self)

def done(self):
"""Refresh the job and checks if it is complete.
:rtype: bool
:returns: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
self.reload()
return self.state == _DONE_STATE

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.
:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.
:rtype: _AsyncJob
:returns: This instance.
:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
if self.state is None:
self.begin()
return super(_AsyncJob, self).result(timeout=timeout)

def cancelled(self):
"""Check if the job has been cancelled.
This always returns False. It's not possible to check if a job was
cancelled in the API. This method is here to satisfy the interface
for :class:`google.cloud.future.Future`.
:rtype: bool
:returns: False
"""
return (self.error_result is not None
and self.error_result.get('reason') == _STOPPED_REASON)


class _LoadConfiguration(object):
Expand Down Expand Up @@ -1127,11 +1255,44 @@ def from_api_repr(cls, resource, client):
job._set_properties(resource)
return job

def results(self):
def query_results(self):
"""Construct a QueryResults instance, bound to this job.
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: results instance
"""
from google.cloud.bigquery.query import QueryResults
return QueryResults.from_query_job(self)

def results(self):
"""DEPRECATED.
This method is deprecated. Use :meth:`query_results` or :meth:`result`.
Construct a QueryResults instance, bound to this job.
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.
"""
warnings.warn(
'QueryJob.results() is deprecated. Please use query_results() or '
'result().', DeprecationWarning)
return self.query_results()

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.
:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.
:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.
:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
super(QueryJob, self).result(timeout=timeout)
# Return a QueryResults instance instead of returning the job.
return self.query_results()
10 changes: 10 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import time
import unittest
import uuid

from google.cloud import bigquery
from google.cloud._helpers import UTC
Expand Down Expand Up @@ -1013,6 +1014,15 @@ def test_large_query_w_public_data(self):
rows = list(iterator)
self.assertEqual(len(rows), LIMIT)

def test_async_query_future(self):
query_job = Config.CLIENT.run_async_query(
str(uuid.uuid4()), 'SELECT 1')
query_job.use_legacy_sql = False

iterator = query_job.result().fetch_data()
rows = list(iterator)
self.assertEqual(rows, [(1,)])

def test_insert_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
Loading

0 comments on commit a76bb1d

Please sign in to comment.