Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Future interface to BigQuery jobs #3626

Merged
merged 8 commits into from
Jul 21, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 165 additions & 20 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@

"""Define API Jobs."""

import collections
import threading
import warnings

import six
from six.moves import http_client

from google.cloud import exceptions
from google.cloud.exceptions import NotFound
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -27,6 +33,44 @@
from google.cloud.bigquery._helpers import UDFResourcesProperty
from google.cloud.bigquery._helpers import _EnumProperty
from google.cloud.bigquery._helpers import _TypedProperty
import google.cloud.future.base

This comment was marked as spam.


_DONE_STATE = 'DONE'


_ERROR_REASON_TO_EXCEPTION = {
'accessDenied': http_client.FORBIDDEN,
'backendError': http_client.INTERNAL_SERVER_ERROR,
'billingNotEnabled': http_client.FORBIDDEN,
'billingTierLimitExceeded': http_client.BAD_REQUEST,
'blocked': http_client.FORBIDDEN,
'duplicate': http_client.CONFLICT,
'internalError': http_client.INTERNAL_SERVER_ERROR,
'invalid': http_client.BAD_REQUEST,
'invalidQuery': http_client.BAD_REQUEST,
'notFound': http_client.NOT_FOUND,
'notImplemented': http_client.NOT_IMPLEMENTED,
'quotaExceeded': http_client.FORBIDDEN,
'rateLimitExceeded': http_client.FORBIDDEN,
'resourceInUse': http_client.BAD_REQUEST,
'resourcesExceeded': http_client.BAD_REQUEST,
'responseTooLarge': http_client.FORBIDDEN,
'stopped': http_client.OK,
'tableUnavailable': http_client.BAD_REQUEST,
}

_FakeResponse = collections.namedtuple('_FakeResponse', ['status'])

This comment was marked as spam.

This comment was marked as spam.



def _error_result_to_exception(error_result):
""""""

This comment was marked as spam.

This comment was marked as spam.

reason = error_result.get('reason')
status_code = _ERROR_REASON_TO_EXCEPTION.get(
reason, http_client.INTERNAL_SERVER_ERROR)
# make_exception expects an httplib2 response object.
fake_response = _FakeResponse(status=status_code)
return exceptions.make_exception(
fake_response, b'', error_info=error_result, use_json=False)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.



class Compression(_EnumProperty):
Expand Down Expand Up @@ -82,16 +126,23 @@ class WriteDisposition(_EnumProperty):
ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY)


class _BaseJob(object):
"""Base class for jobs.
class _AsyncJob(google.cloud.future.base.PollingFuture):
"""Base class for asynchronous jobs.

:type name: str
:param name: the name of the job

:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, client):
def __init__(self, name, client):
super(_AsyncJob, self).__init__()
self.name = name
self._client = client
self._properties = {}
self._result_set = False
self._completion_lock = threading.Lock()

@property
def project(self):
Expand All @@ -117,21 +168,6 @@ def _require_client(self, client):
client = self._client
return client


class _AsyncJob(_BaseJob):
"""Base class for asynchronous jobs.

:type name: str
:param name: the name of the job

:type client: :class:`google.cloud.bigquery.client.Client`
:param client: A client which holds credentials and project configuration
for the dataset (which requires a project).
"""
def __init__(self, name, client):
super(_AsyncJob, self).__init__(client)
self.name = name

@property
def job_type(self):
"""Type of job
Expand Down Expand Up @@ -273,6 +309,9 @@ def _set_properties(self, api_response):
self._properties.clear()
self._properties.update(cleaned)

# For Future interface
self._set_future_result()

@classmethod
def _get_resource_config(cls, resource):
"""Helper for :meth:`from_api_repr`
Expand Down Expand Up @@ -345,7 +384,7 @@ def exists(self, client=None):
return True

def reload(self, client=None):
"""API call: refresh job properties via a GET request
"""API call: refresh job properties via a GET request.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get
Expand All @@ -371,12 +410,85 @@ def cancel(self, client=None):
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.

:rtype: bool
:returns: Boolean indicating that the cancel request was sent.
"""
client = self._require_client(client)

api_response = client._connection.api_request(
method='POST', path='%s/cancel' % (self.path,))
self._set_properties(api_response['job'])
# The Future interface requires that we return True if the *attempt*
# to cancel was successful.
return True

This comment was marked as spam.

This comment was marked as spam.


# The following methods implement the PollingFuture interface. Note that
# the methods above are from the pre-Future interface and are left for
# compatibility. The only "overloaded" method is :meth:`cancel`, which
# satisfies both interfaces.

def _set_future_result(self):
"""Set the result or exception from the job if it is complete."""
# This must be done in a lock to prevent the polling thread
# and main thread from both executing the completion logic
# at the same time.
with self._completion_lock:
# If the operation isn't complete or if the result has already been
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != _DONE_STATE or self._result_set:
return

if self.error_result is not None:
exception = _error_result_to_exception(self.error_result)
self.set_exception(exception)
else:
self.set_result(self)

def done(self):
"""Refresh the job and checks if it is complete.

:rtype: bool
:returns: True if the job is complete, False otherwise.
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != _DONE_STATE:
self.reload()
return self.state == _DONE_STATE

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.

:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.

:rtype: _AsyncJob
:returns: This instance.

:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
if self.state is None:
self.begin()
return super(_AsyncJob, self).result(timeout=timeout)

def cancelled(self):
"""Check if the job has been cancelled.

This always returns False. It's not possible to check if a job was
cancelled in the API. This method is here to satisfy the interface
for :class:`google.cloud.future.Future`.

:rtype: bool
:returns: False
"""
return (self.error_result is not None
and self.error_result.get('reason') == 'stopped')

This comment was marked as spam.

This comment was marked as spam.



class _LoadConfiguration(object):
Expand Down Expand Up @@ -1127,11 +1239,44 @@ def from_api_repr(cls, resource, client):
job._set_properties(resource)
return job

def results(self):
def query_results(self):
"""Construct a QueryResults instance, bound to this job.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: results instance
"""
from google.cloud.bigquery.query import QueryResults
return QueryResults.from_query_job(self)

def results(self):
"""DEPRECATED.

This comment was marked as spam.

This comment was marked as spam.


This method is deprecated. Use :meth:`query_results` or :meth:`result`.

Construct a QueryResults instance, bound to this job.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.
"""
warnings.warn(
'QueryJob.results() is deprecated. Please use query_results() or '
'result().', DeprecationWarning)
return self.query_results()

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.

:type timeout: int
:param timeout: How long to wait for job to complete before raising
a :class:`TimeoutError`.

:rtype: :class:`~google.cloud.bigquery.query.QueryResults`
:returns: The query results.

:raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job
failed or :class:`TimeoutError` if the job did not complete in the
given timeout.
"""
super(QueryJob, self).result(timeout=timeout)
# Return a QueryResults instance instead of returning the job.
return self.query_results()
101 changes: 99 additions & 2 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import warnings

from six.moves import http_client
import unittest


class TestErrorResultToException(unittest.TestCase):

This comment was marked as spam.

This comment was marked as spam.

def _call_fut(self, *args, **kwargs):
from google.cloud.bigquery import job
return job._error_result_to_exception(*args, **kwargs)

def test_simple(self):
error_result = {
'reason': 'invalid',
'meta': 'data'
}
exception = self._call_fut(error_result)
self.assertEqual(exception.code, http_client.BAD_REQUEST)
self.assertIn("'meta': 'data'", exception.message)

def test_missing_reason(self):
error_result = {}
exception = self._call_fut(error_result)
self.assertEqual(exception.code, http_client.INTERNAL_SERVER_ERROR)


class _Base(object):
PROJECT = 'project'
SOURCE1 = 'http://example.com/source1.csv'
Expand Down Expand Up @@ -1514,15 +1538,88 @@ def test_from_api_repr_w_properties(self):
self.assertIs(dataset._client, client)
self._verifyResourceProperties(dataset, RESOURCE)

def test_results(self):
def test_cancelled(self):
client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
job._properties['status'] = {
'state': 'DONE',
'errorResult': {
'reason': 'stopped'
}
}

self.assertTrue(job.cancelled())

def test_query_results(self):
from google.cloud.bigquery.query import QueryResults

client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
results = job.results()
results = job.query_results()
self.assertIsInstance(results, QueryResults)
self.assertIs(results._job, job)

def test_results_is_deprecated(self):
client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)

with warnings.catch_warnings(record=True) as warned:
warnings.simplefilter('always')
job.results()
self.assertEqual(len(warned), 1)
self.assertIn('deprecated', str(warned[0]))

def test_result(self):
from google.cloud.bigquery.query import QueryResults

client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
job._properties['status'] = {'state': 'DONE'}

result = job.result()

self.assertIsInstance(result, QueryResults)
self.assertIs(result._job, job)

def test_result_invokes_begins(self):
begun_resource = self._makeResource()
done_resource = copy.deepcopy(begun_resource)
done_resource['status'] = {'state': 'DONE'}
connection = _Connection(begun_resource, done_resource)
client = _Client(self.PROJECT, connection=connection)
job = self._make_one(self.JOB_NAME, self.QUERY, client)

job.result()

self.assertEqual(len(connection._requested), 2)
begin_request, reload_request = connection._requested
self.assertEqual(begin_request['method'], 'POST')
self.assertEqual(reload_request['method'], 'GET')

def test_result_error(self):
from google.cloud import exceptions

client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
error_result = {
'debugInfo': 'DEBUG',
'location': 'LOCATION',
'message': 'MESSAGE',
'reason': 'invalid'
}
job._properties['status'] = {
'errorResult': error_result,
'errors': [error_result],
'state': 'DONE'
}
job._set_future_result()

with self.assertRaises(exceptions.GoogleCloudError) as exc_info:
job.result()

self.assertIsInstance(exc_info.exception, exceptions.GoogleCloudError)
self.assertEqual(exc_info.exception.code, http_client.BAD_REQUEST)

def test_begin_w_bound_client(self):
PATH = '/projects/%s/jobs' % (self.PROJECT,)
RESOURCE = self._makeResource()
Expand Down