Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Future interface to BigQuery jobs #3626

Merged
merged 8 commits into from
Jul 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 181 additions & 20 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@

"""Define API Jobs."""

import collections
import threading
import warnings

import six
from six.moves import http_client

from google.cloud import exceptions
from google.cloud.exceptions import NotFound
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -27,6 +33,60 @@
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _TypedProperty
import google.cloud.future.base

This comment was marked as spam.


_DONE_STATE = 'DONE'
_STOPPED_REASON = 'stopped'

_ERROR_REASON_TO_EXCEPTION = {
'accessDenied': http_client.FORBIDDEN,
'backendError': http_client.INTERNAL_SERVER_ERROR,
'billingNotEnabled': http_client.FORBIDDEN,
'billingTierLimitExceeded': http_client.BAD_REQUEST,
'blocked': http_client.FORBIDDEN,
'duplicate': http_client.CONFLICT,
'internalError': http_client.INTERNAL_SERVER_ERROR,
'invalid': http_client.BAD_REQUEST,
'invalidQuery': http_client.BAD_REQUEST,
'notFound': http_client.NOT_FOUND,
'notImplemented': http_client.NOT_IMPLEMENTED,
'quotaExceeded': http_client.FORBIDDEN,
'rateLimitExceeded': http_client.FORBIDDEN,
'resourceInUse': http_client.BAD_REQUEST,
'resourcesExceeded': http_client.BAD_REQUEST,
'responseTooLarge': http_client.FORBIDDEN,
'stopped': http_client.OK,
'tableUnavailable': http_client.BAD_REQUEST,
}

_FakeResponse = collections.namedtuple('_FakeResponse', ['status'])

This comment was marked as spam.

This comment was marked as spam.



def _error_result_to_exception(error_result):
"""Maps BigQuery error reasons to an exception.

The reasons and their matching HTTP status codes are documented on
the `troubleshooting errors`_ page.

.. _troubleshooting errors: https://cloud.google.com/bigquery\
/troubleshooting-errors

:type error_result: Mapping[str, str]
:param error_result: The error result from BigQuery.

:rtype google.cloud.exceptions.GoogleCloudError:
:returns: The mapped exception.
"""
reason = error_result.get('reason')
status_code = _ERROR_REASON_TO_EXCEPTION.get(
reason, http_client.INTERNAL_SERVER_ERROR)
# make_exception expects an httplib2 response object.
fake_response = _FakeResponse(status=status_code)
return exceptions.make_exception(
fake_response,
error_result.get('message', ''),
error_info=error_result,
use_json=False)


class Compression(_EnumProperty):
Expand Down Expand Up @@ -82,16 +142,23 @@ class WriteDisposition(_EnumProperty):
ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY)


class _BaseJob(object):
"""Base class for jobs.
class _AsyncJob(google.cloud.future.base.PollingFuture):
"""Base class for asynchronous jobs.

:type name: str
:param name: the name of the job

:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, client):
def __init__(self, name, client):
super(_AsyncJob, self).__init__()
self.name = name
self._client = client
self._properties = {}
self._result_set = False
self._completion_lock = threading.Lock()

@property
def project(self):
Expand All @@ -117,21 +184,6 @@ def _require_client(self, client):
client = self._client
return client


class _AsyncJob(_BaseJob):
"""Base class for asynchronous jobs.

:type name: str
:param name: the name of the job

:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, name, client):
super(_AsyncJob, self).__init__(client)
self.name = name

@property
def job_type(self):
"""Type of job
Expand Down Expand Up @@ -273,6 +325,9 @@ def _set_properties(self, api_response):
self._properties.clear()
self._properties.update(cleaned)

# For Future interface
self._set_future_result()

@classmethod
def _get_resource_config(cls, resource):
"""Helper for :meth:`from_api_repr`
Expand Down Expand Up @@ -345,7 +400,7 @@ def exists(self, client=None):
return True

def reload(self, client=None):
"""API call: refresh job properties via a GET request
"""API call: refresh job properties via a GET request.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
Expand All @@ -371,12 +426,85 @@ def cancel(self, client=None):
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: bool
:returns: Boolean indicating that the cancel request was sent.
"""
client = self._require_client(client)

api_response = client._connection.api_request(
method='POST', path='%s/cancel' % (self.path,))
self._set_properties(api_response['job'])
# The Future interface requires that we return True if the *attempt*
# to cancel was successful.
return True

# The following methods implement the PollingFuture interface. Note that
# the methods above are from the pre-Future interface and are left for
# compatibility. The only "overloaded" method is :meth:`cancel`, which
# satisfies both interfaces.

def _set_future_result(self):
"""Set the result or exception from the job if it is complete."""
# This must be done in a lock to prevent the polling thread
# and main thread from both executing the completion logic
# at the same time.
with self._completion_lock:
# If the operation isn't complete or if the result has already been
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != _DONE_STATE or self._result_set:
return

if self.error_result is not None:
exception = _error_result_to_exception(self.error_result)
self.set_exception(exception)
else:
self.set_result(self)

def done(self):
"""Refresh the job and checks if it is complete.

:rtype: bool
:returns: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
self.reload()
return self.state == _DONE_STATE

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.

:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.

:rtype: _AsyncJob
:returns: This instance.

:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
if self.state is None:
self.begin()
return super(_AsyncJob, self).result(timeout=timeout)

def cancelled(self):
"""Check if the job has been cancelled.

This always returns False. It's not possible to check if a job was
cancelled in the API. This method is here to satisfy the interface
for :class:`google.cloud.future.Future`.

:rtype: bool
:returns: False
"""
return (self.error_result is not None
and self.error_result.get('reason') == _STOPPED_REASON)


class _LoadConfiguration(object):
Expand Down Expand Up @@ -1127,11 +1255,44 @@ def from_api_repr(cls, resource, client):
job._set_properties(resource)
return job

def results(self):
def query_results(self):
"""Construct a QueryResults instance, bound to this job.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: results instance
"""
from google.cloud.bigquery.query import QueryResults
return QueryResults.from_query_job(self)

def results(self):
"""DEPRECATED.

This comment was marked as spam.

This comment was marked as spam.


This method is deprecated. Use :meth:`query_results` or :meth:`result`.

Construct a QueryResults instance, bound to this job.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.
"""
warnings.warn(
'QueryJob.results() is deprecated. Please use query_results() or '
'result().', DeprecationWarning)
return self.query_results()

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.

:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.

:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
super(QueryJob, self).result(timeout=timeout)
# Return a QueryResults instance instead of returning the job.
return self.query_results()
10 changes: 10 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import time
import unittest
import uuid

from google.cloud import bigquery
from google.cloud._helpers import UTC
Expand Down Expand Up @@ -1013,6 +1014,15 @@ def test_large_query_w_public_data(self):
rows = list(iterator)
self.assertEqual(len(rows), LIMIT)

def test_async_query_future(self):
query_job = Config.CLIENT.run_async_query(
str(uuid.uuid4()), 'SELECT 1')
query_job.use_legacy_sql = False

iterator = query_job.result().fetch_data()
rows = list(iterator)
self.assertEqual(rows, [(1,)])

def test_insert_nested_nested(self):
# See #2951
SF = bigquery.SchemaField
Expand Down
Loading