-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Future interface to BigQuery jobs #3626
Conversation
@tswast this is the usage Im going for: https://gist.github.com/jonparrott/cb0ec50b8c70dfb32693fae021659baa |
I don't like
I'd prefer if it was
Also, the query job needs to be calling getQueryResults and checking for |
core/google/cloud/future/base.py
Outdated
retry_on = tenacity.retry_if_result( | ||
functools.partial(operator.is_not, True)) | ||
# Use exponential backoff with jitter. | ||
wait_on = ( |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
That seems reasonable as long as we don't lose data, but wouldn't that require switching to core iterator (#2840)? Could we accept the non-breaking behavior of this PR as-is for now and do another PR to switch the return value of
Why is that preferable? Doing a long-lived HTTP request is quite different from any of the other behavior in this library. Also, would you be okay with addressing this in a follow-up PR rather than in this one? |
#2840 is already implemented.
|
I'm okay with doing the |
@tswast Cool, are you good with this PR as-is then? |
Could we make the change where |
If not, I'll propose we make that as a breaking change when changing the polling method. |
ee506c6
to
e80335f
Compare
Yep. Done. Can you try this out and let me know how broken it is? This should work: def async_query(query):
client = bigquery.Client()
query_job = client.run_async_query(str(uuid.uuid4()), query)
query_job.use_legacy_sql = False
rows = query_job.result().fetch_data(max_results=10)
for row in rows:
print(row) |
@tswast verified that this seems to work: >>> from google.cloud import bigquery
>>> import uuid
>>>
>>> client = bigquery.Client()
>>> query = 'SELECT 1'
>>> query_job = client.run_async_query(str(uuid.uuid4()), query)
>>> query_job.use_legacy_sql = False
>>>
>>> rows = query_job.result().fetch_data(max_results=10)
>>> for row in rows:
... print(row)
...
(1,) |
cf49707
to
45f5400
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no real concerns here though I think someone else should have a look?
@@ -27,6 +31,7 @@ | |||
from google.cloud.bigquery._helpers import UDFResourcesProperty | |||
from google.cloud.bigquery._helpers import _EnumProperty | |||
from google.cloud.bigquery._helpers import _TypedProperty | |||
import google.cloud.future.base |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
client = self._require_client(client) | ||
|
||
api_response = client._connection.api_request( | ||
method='POST', path='%s/cancel' % (self.path,)) | ||
self._set_properties(api_response['job']) | ||
return True |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# set, do not call set_result/set_exception again. | ||
# Note: self._result_set is set to True in set_result and | ||
# set_exception, in case those methods are invoked directly. | ||
if self.state != 'DONE' or self._result_set: |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
if self.error_result is not None: | ||
exception = exceptions.GoogleCloudError( | ||
self.error_result, errors=self.errors) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
# Do not refresh is the state is already done, as the job will not | ||
# change once complete. | ||
if self.state != 'DONE': |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
"""Construct a QueryResults instance, bound to this job. | ||
|
||
:rtype: :class:`~google.cloud.bigquery.query.QueryResults` | ||
:returns: results instance | ||
""" | ||
from google.cloud.bigquery.query import QueryResults | ||
return QueryResults.from_query_job(self) | ||
|
||
def results(self): | ||
"""DEPRECATED. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
'tableUnavailable': http_client.BAD_REQUEST, | ||
} | ||
|
||
_FakeResponse = collections.namedtuple('_FakeResponse', ['status']) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
|
||
def _error_result_to_exception(error_result): | ||
"""""" |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# make_exception expects an httplib2 response object. | ||
fake_response = _FakeResponse(status=status_code) | ||
return exceptions.make_exception( | ||
fake_response, b'', error_info=error_result, use_json=False) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -447,7 +487,8 @@ def cancelled(self): | |||
:rtype: bool | |||
:returns: False | |||
""" | |||
return False | |||
return (self.error_result is not None | |||
and self.error_result.get('reason') == 'stopped') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
bigquery/tests/unit/test_job.py
Outdated
import unittest | ||
|
||
|
||
class TestErrorResultToException(unittest.TestCase): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Add future interface to bigquery Jobs. * Make QueryJob return QueryResults from result() * Deprecate QueryJob.results()
* Add future interface to bigquery Jobs. * Make QueryJob return QueryResults from result() * Deprecate QueryJob.results()
* Add future interface to bigquery Jobs. * Make QueryJob return QueryResults from result() * Deprecate QueryJob.results()
Resolves #3556
Towards #3617