From ee506c6e2e7e3571a3191310b23a87c552304bd0 Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Tue, 18 Jul 2017 13:57:12 -0700 Subject: [PATCH] Add basic future interface to bigquery --- bigquery/google/cloud/bigquery/job.py | 58 ++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 4f791bdbea0c9..19a616c1dd3f9 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -14,8 +14,11 @@ """Define API Jobs.""" +import threading + import six +from google.cloud import exceptions from google.cloud.exceptions import NotFound from google.cloud._helpers import _datetime_from_microseconds from google.cloud.bigquery.dataset import Dataset @@ -27,6 +30,7 @@ from google.cloud.bigquery._helpers import UDFResourcesProperty from google.cloud.bigquery._helpers import _EnumProperty from google.cloud.bigquery._helpers import _TypedProperty +import google.cloud.future.base class Compression(_EnumProperty): @@ -82,7 +86,7 @@ class WriteDisposition(_EnumProperty): ALLOWED = (WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY) -class _BaseJob(object): +class _BaseJob(google.cloud.future.base.PollingFuture): """Base class for jobs. :type client: :class:`google.cloud.bigquery.client.Client` @@ -90,6 +94,7 @@ class _BaseJob(object): for the dataset (which requires a project). """ def __init__(self, client): + super(_BaseJob, self).__init__() self._client = client self._properties = {} @@ -131,6 +136,8 @@ class _AsyncJob(_BaseJob): def __init__(self, name, client): super(_AsyncJob, self).__init__(client) self.name = name + self._result_set = False + self._completion_lock = threading.Lock() @property def job_type(self): @@ -273,6 +280,9 @@ def _set_properties(self, api_response): self._properties.clear() self._properties.update(cleaned) + # For Future interface + self._set_future_result() + @classmethod def _get_resource_config(cls, resource): """Helper for :meth:`from_api_repr` @@ -345,7 +355,7 @@ def exists(self, client=None): return True def reload(self, client=None): - """API call: refresh job properties via a GET request + """API call: refresh job properties via a GET request. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get @@ -371,12 +381,56 @@ def cancel(self, client=None): ``NoneType`` :param client: the client to use. If not passed, falls back to the ``client`` stored on the current dataset. + + :rtype: bool + :returns: Boolean indicating that the cancel request was sent. """ client = self._require_client(client) api_response = client._connection.api_request( method='POST', path='%s/cancel' % (self.path,)) self._set_properties(api_response['job']) + return True + + # The following methods implement the PollingFuture interface. Note that + # the methods above are from the pre-Future interface and are left for + # compatibility. The only "overloaded" method is :meth:`cancel`, which + # satisfies both interfaces. + + def _set_future_result(self): + """Set the result or exception from the job if it is complete.""" + # This must be done in a lock to prevent the polling thread + # and main thread from both executing the completion logic + # at the same time. + with self._completion_lock: + # If the operation isn't complete or if the result has already been + # set, do not call set_result/set_exception again. + # Note: self._result_set is set to True in set_result and + # set_exception, in case those methods are invoked directly. + if not self.state != 'DONE' or self._result_set: + return + + if self.error_result: + exception = exceptions.GoogleCloudError( + self.error_result, errors=self.errors) + self.set_exception(exception) + else: + self.set_result(self) + + def done(self): + # Do not refresh is the state is already done, as the job will not + # change once complete. + if self.state != 'DONE': + self.reload() + return self.state == 'DONE' + + def result(self, timeout=None): + if self.state is None: + self.begin() + return super(self, _AsyncJob).result(timeout=timeout) + + def cancelled(self): + return False class _LoadConfiguration(object):