Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Future interface to BigQuery jobs #3626

Merged
merged 8 commits into from
Jul 21, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

"""Define API Jobs."""

import collections
import threading
import warnings

import six
from six.moves import http_client

from google.cloud import exceptions
from google.cloud.exceptions import NotFound
Expand All @@ -33,6 +35,43 @@
from google.cloud.bigquery._helpers import _TypedProperty
import google.cloud.future.base

This comment was marked as spam.


_DONE_STATE = 'DONE'


_ERROR_REASON_TO_EXCEPTION = {
'accessDenied': http_client.FORBIDDEN,
'backendError': http_client.INTERNAL_SERVER_ERROR,
'billingNotEnabled': http_client.FORBIDDEN,
'billingTierLimitExceeded': http_client.BAD_REQUEST,
'blocked': http_client.FORBIDDEN,
'duplicate': http_client.CONFLICT,
'internalError': http_client.INTERNAL_SERVER_ERROR,
'invalid': http_client.BAD_REQUEST,
'invalidQuery': http_client.BAD_REQUEST,
'notFound': http_client.NOT_FOUND,
'notImplemented': http_client.NOT_IMPLEMENTED,
'quotaExceeded': http_client.FORBIDDEN,
'rateLimitExceeded': http_client.FORBIDDEN,
'resourceInUse': http_client.BAD_REQUEST,
'resourcesExceeded': http_client.BAD_REQUEST,
'responseTooLarge': http_client.FORBIDDEN,
'stopped': http_client.OK,
'tableUnavailable': http_client.BAD_REQUEST,
}

_FakeResponse = collections.namedtuple('_FakeResponse', ['status'])

This comment was marked as spam.

This comment was marked as spam.



def _error_result_to_exception(error_result):
""""""

This comment was marked as spam.

This comment was marked as spam.

reason = error_result.get('reason')
status_code = _ERROR_REASON_TO_EXCEPTION.get(
reason, http_client.INTERNAL_SERVER_ERROR)
# make_exception expects an httplib2 response object.
fake_response = _FakeResponse(status=status_code)
return exceptions.make_exception(
fake_response, b'', error_info=error_result, use_json=False)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.



class Compression(_EnumProperty):
"""Pseudo-enum for ``compression`` properties."""
Expand Down Expand Up @@ -380,6 +419,8 @@ def cancel(self, client=None):
api_response = client._connection.api_request(
method='POST', path='%s/cancel' % (self.path,))
self._set_properties(api_response['job'])
# The Future interface requires that we return True if the *attempt*
# to cancel was successful.
return True

This comment was marked as spam.

This comment was marked as spam.


# The following methods implement the PollingFuture interface. Note that
Expand All @@ -397,12 +438,11 @@ def _set_future_result(self):
# set, do not call set_result/set_exception again.
# Note: self._result_set is set to True in set_result and
# set_exception, in case those methods are invoked directly.
if self.state != 'DONE' or self._result_set:
if self.state != _DONE_STATE or self._result_set:
return

if self.error_result is not None:
exception = exceptions.GoogleCloudError(
self.error_result, errors=self.errors)
exception = _error_result_to_exception(self.error_result)
self.set_exception(exception)
else:
self.set_result(self)
Expand All @@ -415,9 +455,9 @@ def done(self):
"""
# Do not refresh is the state is already done, as the job will not
# change once complete.
if self.state != 'DONE':
if self.state != _DONE_STATE:
self.reload()
return self.state == 'DONE'
return self.state == _DONE_STATE

def result(self, timeout=None):
"""Start the job and wait for it to complete and get the result.
Expand Down Expand Up @@ -447,7 +487,8 @@ def cancelled(self):
:rtype: bool
:returns: False
"""
return False
return (self.error_result is not None
and self.error_result.get('reason') == 'stopped')

This comment was marked as spam.

This comment was marked as spam.



class _LoadConfiguration(object):
Expand Down
37 changes: 35 additions & 2 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,30 @@
import copy
import warnings

from six.moves import http_client
import unittest


class TestErrorResultToException(unittest.TestCase):

This comment was marked as spam.

This comment was marked as spam.

def _call_fut(self, *args, **kwargs):
from google.cloud.bigquery import job
return job._error_result_to_exception(*args, **kwargs)

def test_simple(self):
error_result = {
'reason': 'invalid',
'meta': 'data'
}
exception = self._call_fut(error_result)
self.assertEqual(exception.code, http_client.BAD_REQUEST)
self.assertIn("'meta': 'data'", exception.message)

def test_missing_reason(self):
error_result = {}
exception = self._call_fut(error_result)
self.assertEqual(exception.code, http_client.INTERNAL_SERVER_ERROR)


class _Base(object):
PROJECT = 'project'
SOURCE1 = 'http://example.com/source1.csv'
Expand Down Expand Up @@ -1517,6 +1538,18 @@ def test_from_api_repr_w_properties(self):
self.assertIs(dataset._client, client)
self._verifyResourceProperties(dataset, RESOURCE)

def test_cancelled(self):
client = _Client(self.PROJECT)
job = self._make_one(self.JOB_NAME, self.QUERY, client)
job._properties['status'] = {
'state': 'DONE',
'errorResult': {
'reason': 'stopped'
}
}

self.assertTrue(job.cancelled())

def test_query_results(self):
from google.cloud.bigquery.query import QueryResults

Expand Down Expand Up @@ -1572,7 +1605,7 @@ def test_result_error(self):
'debugInfo': 'DEBUG',
'location': 'LOCATION',
'message': 'MESSAGE',
'reason': 'REASON'
'reason': 'invalid'
}
job._properties['status'] = {
'errorResult': error_result,
Expand All @@ -1585,7 +1618,7 @@ def test_result_error(self):
job.result()

self.assertIsInstance(exc_info.exception, exceptions.GoogleCloudError)
self.assertEqual(exc_info.exception.errors, [error_result])
self.assertEqual(exc_info.exception.code, http_client.BAD_REQUEST)

def test_begin_w_bound_client(self):
PATH = '/projects/%s/jobs' % (self.PROJECT,)
Expand Down