From e028c38361e75b0c9f19bf96b20be8fcb1699886 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 27 Sep 2017 16:29:59 -0700 Subject: [PATCH] BigQuery: add client.query_rows(), remove client.run_sync_query(). (#4065) * BigQuery: add client.query_rows(), remove client.run_sync_query(). The query_rows() method will be the new way to run a query synchronously. It starts a query job, then waits for the results, returning the rows as results. --- bigquery/google/cloud/bigquery/__init__.py | 2 + bigquery/google/cloud/bigquery/client.py | 50 ++++-- bigquery/google/cloud/bigquery/job.py | 163 ++++++++++++------ bigquery/google/cloud/bigquery/query.py | 1 + bigquery/tests/system.py | 67 ++++---- bigquery/tests/unit/test_client.py | 184 ++++++++++++++++----- 6 files changed, 337 insertions(+), 130 deletions(-) diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index ec92e7c40128..7bbcc7782ee2 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -34,6 +34,7 @@ from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.job import CopyJobConfig from google.cloud.bigquery.job import ExtractJobConfig +from google.cloud.bigquery.job import QueryJobConfig from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table @@ -45,6 +46,7 @@ 'Dataset', 'CopyJobConfig', 'ExtractJobConfig', + 'QueryJobConfig', 'ScalarQueryParameter', 'SchemaField', 'StructQueryParameter', diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index bae5613b629a..7ceed4fc1e41 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -30,6 +30,7 @@ from google.cloud.bigquery.job import ExtractJob from google.cloud.bigquery.job import LoadJob from google.cloud.bigquery.job import QueryJob +from google.cloud.bigquery.job import QueryJobConfig from google.cloud.bigquery.query import QueryResults @@ -612,29 +613,46 @@ def run_async_query(self, job_id, query, udf_resources=udf_resources, query_parameters=query_parameters) - def run_sync_query(self, query, udf_resources=(), query_parameters=()): - """Run a SQL query synchronously. + def query_rows(self, query, job_config=None, job_id=None, timeout=None): + """Start a query job and wait for the results. + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query :type query: str :param query: SQL query to be executed - :type udf_resources: tuple - :param udf_resources: An iterable of - :class:`google.cloud.bigquery._helpers.UDFResource` - (empty by default) + :type job_id: str + :param job_id: (Optional) ID to use for the query job. - :type query_parameters: tuple - :param query_parameters: - An iterable of - :class:`google.cloud.bigquery._helpers.AbstractQueryParameter` - (empty by default) + :type timeout: int + :param timeout: + (Optional) How long to wait for job to complete before raising a + :class:`TimeoutError`. - :rtype: :class:`google.cloud.bigquery.query.QueryResults` - :returns: a new ``QueryResults`` instance + :rtype: :class:`~google.api.core.page_iterator.Iterator` + :returns: + Iterator of row data :class:`tuple`s. During each page, the + iterator will have the ``total_rows`` attribute set, which counts + the total number of rows **in the result set** (this is distinct + from the total number of rows in the current page: + ``iterator.page.num_items``). + + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the job + failed or :class:`TimeoutError` if the job did not complete in the + given timeout. """ - return QueryResults(query, client=self, - udf_resources=udf_resources, - query_parameters=query_parameters) + job_id = _make_job_id(job_id) + + # TODO(swast): move standard SQL default to QueryJobConfig class. + if job_config is None: + job_config = QueryJobConfig() + if job_config.use_legacy_sql is None: + job_config.use_legacy_sql = False + + job = QueryJob(job_id, query, client=self, job_config=job_config) + job.begin() + return job.result(timeout=timeout) # pylint: disable=unused-argument diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 11b8beee2b7b..812dde4b32a3 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -1218,11 +1218,52 @@ def from_api_repr(cls, resource, client): return job -class _AsyncQueryConfiguration(object): - """User-settable configuration options for asynchronous query jobs. +class QueryJobConfig(object): + """Configuration options for query jobs. - Values which are ``None`` -> server defaults. + All properties in this class are optional. Values which are ``None`` -> + server defaults. """ + + def __init__(self): + self._properties = {} + + def to_api_repr(self): + """Build an API representation of the copy job config. + + :rtype: dict + :returns: A dictionary in the format used by the BigQuery API. + """ + return copy.deepcopy(self._properties) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct a job configuration given its API representation + + :type resource: dict + :param resource: + An extract job configuration in the same representation as is + returned from the API. + + :rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig` + :returns: Configuration parsed from ``resource``. + """ + config = cls() + config._properties = copy.deepcopy(resource) + return config + + use_legacy_sql = _TypedApiResourceProperty( + 'use_legacy_sql', 'useLegacySql', bool) + """See + https://cloud.google.com/bigquery/docs/\ + reference/v2/jobs#configuration.query.useLegacySql + """ + + dry_run = _TypedApiResourceProperty('dry_run', 'dryRun', bool) + """See + https://g.co/cloud/bigquery/docs/reference/v2/jobs#configuration.dryRun + """ + _allow_large_results = None _create_disposition = None _default_dataset = None @@ -1231,7 +1272,6 @@ class _AsyncQueryConfiguration(object): _priority = None _use_query_cache = None _use_legacy_sql = None - _dry_run = None _write_disposition = None _maximum_billing_tier = None _maximum_bytes_billed = None @@ -1260,20 +1300,60 @@ class QueryJob(_AsyncJob): An iterable of :class:`google.cloud.bigquery._helpers.AbstractQueryParameter` (empty by default) + + :type job_config: :class:`~google.cloud.bigquery.job.QueryJobConfig` + :param job_config: + (Optional) Extra configuration options for the query job. """ _JOB_TYPE = 'query' _UDF_KEY = 'userDefinedFunctionResources' _QUERY_PARAMETERS_KEY = 'queryParameters' def __init__(self, job_id, query, client, - udf_resources=(), query_parameters=()): + udf_resources=(), query_parameters=(), job_config=None): super(QueryJob, self).__init__(job_id, client) + + if job_config is None: + job_config = QueryJobConfig() + self.query = query self.udf_resources = udf_resources self.query_parameters = query_parameters - self._configuration = _AsyncQueryConfiguration() + self._configuration = job_config self._query_results = None + @property + def use_legacy_sql(self): + """See + :class:`~google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`. + """ + return self._configuration.use_legacy_sql + + @use_legacy_sql.setter + def use_legacy_sql(self, value): + """See + :class:`~google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`. + """ + # TODO(swast): remove this method and only allow setting use_legacy_sql + # on QueryJobConfig objects. + self._configuration.use_legacy_sql = value + + @property + def dry_run(self): + """See + :class:`~google.cloud.bigquery.job.QueryJobConfig.dry_run`. + """ + return self._configuration.dry_run + + @dry_run.setter + def dry_run(self, value): + """See + :class:`~google.cloud.bigquery.job.QueryJobConfig.dry_run`. + """ + # TODO(swast): remove this method and only allow setting dry_run + # on QueryJobConfig objects. + self._configuration.dry_run = value + allow_large_results = _TypedProperty('allow_large_results', bool) """See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.allowLargeResults @@ -1314,20 +1394,8 @@ def __init__(self, job_id, query, client, https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.useQueryCache """ - use_legacy_sql = _TypedProperty('use_legacy_sql', bool) - """See - https://cloud.google.com/bigquery/docs/\ - reference/v2/jobs#configuration.query.useLegacySql - """ - - dry_run = _TypedProperty('dry_run', bool) - """See - https://cloud.google.com/bigquery/docs/\ - reference/rest/v2/jobs#configuration.dryRun - """ - - write_disposition = WriteDisposition('write_disposition', - 'writeDisposition') + write_disposition = WriteDisposition( + 'write_disposition', 'writeDisposition') """See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition """ @@ -1363,8 +1431,6 @@ def _populate_config_resource_booleans(self, configuration): configuration['flattenResults'] = self.flatten_results if self.use_query_cache is not None: configuration['useQueryCache'] = self.use_query_cache - if self.use_legacy_sql is not None: - configuration['useLegacySql'] = self.use_legacy_sql def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" @@ -1377,8 +1443,8 @@ def _populate_config_resource(self, configuration): 'projectId': self.default_dataset.project, 'datasetId': self.default_dataset.dataset_id, } - if self.destination is not None: - table_res = self._destination_table_resource() + table_res = self._destination_table_resource() + if table_res is not None: configuration['destinationTable'] = table_res if self.priority is not None: configuration['priority'] = self.priority @@ -1406,6 +1472,7 @@ def _populate_config_resource(self, configuration): def _build_resource(self): """Generate a resource for :meth:`begin`.""" + configuration = self._configuration.to_api_repr() resource = { 'jobReference': { @@ -1413,16 +1480,18 @@ def _build_resource(self): 'jobId': self.job_id, }, 'configuration': { - self._JOB_TYPE: { - 'query': self.query, - }, + self._JOB_TYPE: configuration, }, } - if self.dry_run is not None: - resource['configuration']['dryRun'] = self.dry_run + # The dryRun property only applies to query jobs, but it is defined at + # a level higher up. We need to remove it from the query config. + if 'dryRun' in configuration: + dry_run = configuration['dryRun'] + del configuration['dryRun'] + resource['configuration']['dryRun'] = dry_run - configuration = resource['configuration'][self._JOB_TYPE] + configuration['query'] = self.query self._populate_config_resource(configuration) return resource @@ -1436,19 +1505,28 @@ def _scrub_local_properties(self, cleaned): the client's project. """ configuration = cleaned['configuration']['query'] - self.query = configuration['query'] + # The dryRun property only applies to query jobs, but it is defined at + # a level higher up. We need to copy it to the query config. + self._configuration.dry_run = cleaned['configuration'].get('dryRun') + def _copy_configuration_properties(self, configuration): """Helper: assign subclass configuration properties in cleaned.""" + # The dryRun property only applies to query jobs, but it is defined at + # a level higher up. We need to copy it to the query config. + # It should already be correctly set by the _scrub_local_properties() + # method. + dry_run = self.dry_run + self._configuration = QueryJobConfig.from_api_repr(configuration) + self._configuration.dry_run = dry_run + self.allow_large_results = _bool_or_none( configuration.get('allowLargeResults')) self.flatten_results = _bool_or_none( configuration.get('flattenResults')) self.use_query_cache = _bool_or_none( configuration.get('useQueryCache')) - self.use_legacy_sql = _bool_or_none( - configuration.get('useLegacySql')) self.create_disposition = configuration.get('createDisposition') self.priority = configuration.get('priority') @@ -1459,22 +1537,13 @@ def _copy_configuration_properties(self, configuration): dest_remote = configuration.get('destinationTable') - if dest_remote is None: - if self.destination is not None: - del self.destination - else: - dest_local = self._destination_table_resource() - if dest_remote != dest_local: - project = dest_remote['projectId'] - dataset = Dataset(DatasetReference(project, - dest_remote['datasetId'])) - self.destination = dataset.table(dest_remote['tableId']) + if dest_remote is not None: + dataset = DatasetReference( + dest_remote['projectId'], dest_remote['datasetId']) + self.destination = dataset.table(dest_remote['tableId']) def_ds = configuration.get('defaultDataset') - if def_ds is None: - if self.default_dataset is not None: - del self.default_dataset - else: + if def_ds is not None: self.default_dataset = DatasetReference( def_ds['projectId'], def_ds['datasetId']) udf_resources = [] diff --git a/bigquery/google/cloud/bigquery/query.py b/bigquery/google/cloud/bigquery/query.py index 38400659bdaf..57199556ed84 100644 --- a/bigquery/google/cloud/bigquery/query.py +++ b/bigquery/google/cloud/bigquery/query.py @@ -455,6 +455,7 @@ def fetch_data(self, max_results=None, page_token=None, start_index=None, next_token='pageToken', extra_params=params) iterator.query_result = self + iterator.job = self.job return iterator diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 2785f21cb2a3..2fd43f7951c4 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -699,7 +699,7 @@ def test_job_cancel(self): # raise an error, and that the job completed (in the `retry()` # above). - def test_sync_query_w_legacy_sql_types(self): + def test_query_rows_w_legacy_sql_types(self): naive = datetime.datetime(2016, 12, 5, 12, 41, 9) stamp = '%s %s' % (naive.date().isoformat(), naive.time().isoformat()) zoned = naive.replace(tzinfo=UTC) @@ -730,12 +730,13 @@ def test_sync_query_w_legacy_sql_types(self): }, ] for example in examples: - query = Config.CLIENT.run_sync_query(example['sql']) - query.use_legacy_sql = True - query.run() - self.assertEqual(len(query.rows), 1) - self.assertEqual(len(query.rows[0]), 1) - self.assertEqual(query.rows[0][0], example['expected']) + job_config = bigquery.QueryJobConfig() + job_config.use_legacy_sql = True + rows = list(Config.CLIENT.query_rows( + example['sql'], job_config=job_config)) + self.assertEqual(len(rows), 1) + self.assertEqual(len(rows[0]), 1) + self.assertEqual(rows[0][0], example['expected']) def _generate_standard_sql_types_examples(self): naive = datetime.datetime(2016, 12, 5, 12, 41, 9) @@ -831,15 +832,20 @@ def _generate_standard_sql_types_examples(self): }, ] - def test_sync_query_w_standard_sql_types(self): + def test_query_rows_w_standard_sql_types(self): examples = self._generate_standard_sql_types_examples() for example in examples: - query = Config.CLIENT.run_sync_query(example['sql']) - query.use_legacy_sql = False - query.run() - self.assertEqual(len(query.rows), 1) - self.assertEqual(len(query.rows[0]), 1) - self.assertEqual(query.rows[0][0], example['expected']) + rows = list(Config.CLIENT.query_rows(example['sql'])) + self.assertEqual(len(rows), 1) + self.assertEqual(len(rows[0]), 1) + self.assertEqual(rows[0][0], example['expected']) + + def test_query_rows_w_failed_query(self): + from google.api.core.exceptions import BadRequest + + with self.assertRaises(BadRequest): + Config.CLIENT.query_rows('invalid syntax;') + # TODO(swast): Ensure that job ID is surfaced in the exception. def test_dbapi_w_standard_sql_types(self): examples = self._generate_standard_sql_types_examples() @@ -892,7 +898,7 @@ def _load_table_for_dml(self, rows, dataset_id, table_id): job.result(timeout=JOB_TIMEOUT) self._fetch_single_page(table) - def test_sync_query_w_dml(self): + def test_query_w_dml(self): dataset_name = _make_dataset_id('dml_tests') table_name = 'test_table' self._load_table_for_dml([('Hello World',)], dataset_name, table_name) @@ -901,12 +907,14 @@ def test_sync_query_w_dml(self): WHERE greeting = 'Hello World' """ - query = Config.CLIENT.run_sync_query( + query_job = Config.CLIENT.run_async_query( + 'test_query_w_dml_{}'.format(unique_resource_id()), query_template.format(dataset_name, table_name)) - query.use_legacy_sql = False - query.run() + query_job.use_legacy_sql = False + query_job.begin() + query_job.result() - self.assertEqual(query.num_dml_affected_rows, 1) + self.assertEqual(query_job.num_dml_affected_rows, 1) def test_dbapi_w_dml(self): dataset_name = _make_dataset_id('dml_tests') @@ -923,7 +931,7 @@ def test_dbapi_w_dml(self): self.assertEqual(Config.CURSOR.rowcount, 1) self.assertIsNone(Config.CURSOR.fetchone()) - def test_sync_query_w_query_params(self): + def test_query_w_query_params(self): from google.cloud.bigquery._helpers import ArrayQueryParameter from google.cloud.bigquery._helpers import ScalarQueryParameter from google.cloud.bigquery._helpers import StructQueryParameter @@ -1084,14 +1092,16 @@ def test_sync_query_w_query_params(self): }, ] for example in examples: - query = Config.CLIENT.run_sync_query( + query_job = Config.CLIENT.run_async_query( + 'test_query_w_query_params{}'.format(unique_resource_id()), example['sql'], query_parameters=example['query_parameters']) - query.use_legacy_sql = False - query.run() - self.assertEqual(len(query.rows), 1) - self.assertEqual(len(query.rows[0]), 1) - self.assertEqual(query.rows[0][0], example['expected']) + query_job.use_legacy_sql = False + query_job.begin() + rows = [row for row in query_job.result()] + self.assertEqual(len(rows), 1) + self.assertEqual(len(rows[0]), 1) + self.assertEqual(rows[0][0], example['expected']) def test_dbapi_w_query_parameters(self): examples = [ @@ -1217,11 +1227,8 @@ def test_large_query_w_public_data(self): SQL = 'SELECT * from `{}.{}.{}` LIMIT {}'.format( PUBLIC, DATASET_ID, TABLE_NAME, LIMIT) - query = Config.CLIENT.run_sync_query(SQL) - query.use_legacy_sql = False - query.run() + iterator = Config.CLIENT.query_rows(SQL) - iterator = query.fetch_data(max_results=100) rows = list(iterator) self.assertEqual(len(rows), LIMIT) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index bdab1d36c2cb..d1a6d1218ae8 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import unittest import mock @@ -1348,58 +1349,167 @@ def test_run_async_w_query_parameters(self): self.assertEqual(job.udf_resources, []) self.assertEqual(job.query_parameters, query_parameters) - def test_run_sync_query_defaults(self): - from google.cloud.bigquery.query import QueryResults + def test_query_rows_defaults(self): + from google.api.core.page_iterator import HTTPIterator + JOB = 'job-id' PROJECT = 'PROJECT' - QUERY = 'select count(*) from persons' + QUERY = 'SELECT COUNT(*) FROM persons' + RESOURCE = { + 'jobReference': { + 'projectId': PROJECT, + 'jobId': JOB, + }, + 'configuration': { + 'query': { + 'query': QUERY, + }, + }, + 'status': { + 'state': 'DONE', + }, + } + RESULTS_RESOURCE = { + 'jobReference': RESOURCE['jobReference'], + 'jobComplete': True, + 'schema': { + 'fields': [ + {'name': 'field0', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + ] + }, + 'totalRows': '3', + 'pageToken': 'next-page', + } + FIRST_PAGE = copy.deepcopy(RESULTS_RESOURCE) + FIRST_PAGE['rows'] = [ + {'f': [{'v': '1'}]}, + {'f': [{'v': '2'}]}, + ] + LAST_PAGE = copy.deepcopy(RESULTS_RESOURCE) + LAST_PAGE['rows'] = [ + {'f': [{'v': '3'}]}, + ] + del LAST_PAGE['pageToken'] creds = _make_credentials() http = object() client = self._make_one(project=PROJECT, credentials=creds, _http=http) - query = client.run_sync_query(QUERY) - self.assertIsInstance(query, QueryResults) - self.assertIs(query._client, client) - self.assertIsNone(query.name) - self.assertEqual(query.query, QUERY) - self.assertEqual(query.udf_resources, []) - self.assertEqual(query.query_parameters, []) - - def test_run_sync_query_w_udf_resources(self): - from google.cloud.bigquery._helpers import UDFResource - from google.cloud.bigquery.query import QueryResults + conn = client._connection = _Connection( + RESOURCE, RESULTS_RESOURCE, FIRST_PAGE, LAST_PAGE) - RESOURCE_URI = 'gs://some-bucket/js/lib.js' + rows_iter = client.query_rows(QUERY) + rows = list(rows_iter) + + self.assertEqual(rows, [(1,), (2,), (3,)]) + self.assertIs(rows_iter.client, client) + self.assertIsInstance(rows_iter, HTTPIterator) + self.assertEqual(len(conn._requested), 4) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/projects/PROJECT/jobs') + self.assertIsInstance( + req['data']['jobReference']['jobId'], six.string_types) + + def test_query_rows_w_job_id(self): + from google.api.core.page_iterator import HTTPIterator + + JOB = 'job-id' PROJECT = 'PROJECT' - QUERY = 'select count(*) from persons' + QUERY = 'SELECT COUNT(*) FROM persons' + RESOURCE = { + 'jobReference': { + 'projectId': PROJECT, + 'jobId': JOB, + }, + 'configuration': { + 'query': { + 'query': QUERY, + }, + }, + 'status': { + 'state': 'DONE', + }, + } + RESULTS_RESOURCE = { + 'jobReference': RESOURCE['jobReference'], + 'jobComplete': True, + 'schema': { + 'fields': [ + {'name': 'field0', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + ] + }, + 'totalRows': '0', + } creds = _make_credentials() http = object() client = self._make_one(project=PROJECT, credentials=creds, _http=http) - udf_resources = [UDFResource("resourceUri", RESOURCE_URI)] - query = client.run_sync_query(QUERY, udf_resources=udf_resources) - self.assertIsInstance(query, QueryResults) - self.assertIs(query._client, client) - self.assertIsNone(query.name) - self.assertEqual(query.query, QUERY) - self.assertEqual(query.udf_resources, udf_resources) - self.assertEqual(query.query_parameters, []) - - def test_run_sync_query_w_query_parameters(self): - from google.cloud.bigquery._helpers import ScalarQueryParameter - from google.cloud.bigquery.query import QueryResults + conn = client._connection = _Connection( + RESOURCE, RESULTS_RESOURCE, RESULTS_RESOURCE) + + rows_iter = client.query_rows(QUERY, job_id=JOB) + rows = [row for row in rows_iter] + self.assertEqual(rows, []) + self.assertIs(rows_iter.client, client) + self.assertIsInstance(rows_iter, HTTPIterator) + self.assertEqual(len(conn._requested), 3) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/projects/PROJECT/jobs') + self.assertEqual(req['data']['jobReference']['jobId'], JOB) + + def test_query_rows_w_job_config(self): + from google.cloud.bigquery.job import QueryJobConfig + from google.api.core.page_iterator import HTTPIterator + + JOB = 'job-id' PROJECT = 'PROJECT' - QUERY = 'select count(*) from persons' + QUERY = 'SELECT COUNT(*) FROM persons' + RESOURCE = { + 'jobReference': { + 'projectId': PROJECT, + 'jobId': JOB, + }, + 'configuration': { + 'query': { + 'query': QUERY, + 'useLegacySql': True, + }, + 'dryRun': True, + }, + 'status': { + 'state': 'DONE', + }, + } + RESULTS_RESOURCE = { + 'jobReference': RESOURCE['jobReference'], + 'jobComplete': True, + 'schema': { + 'fields': [ + {'name': 'field0', 'type': 'INTEGER', 'mode': 'NULLABLE'}, + ] + }, + 'totalRows': '0', + } creds = _make_credentials() http = object() client = self._make_one(project=PROJECT, credentials=creds, _http=http) - query_parameters = [ScalarQueryParameter('foo', 'INT64', 123)] - query = client.run_sync_query(QUERY, query_parameters=query_parameters) - self.assertIsInstance(query, QueryResults) - self.assertIs(query._client, client) - self.assertIsNone(query.name) - self.assertEqual(query.query, QUERY) - self.assertEqual(query.udf_resources, []) - self.assertEqual(query.query_parameters, query_parameters) + conn = client._connection = _Connection( + RESOURCE, RESULTS_RESOURCE, RESULTS_RESOURCE) + + job_config = QueryJobConfig() + job_config.use_legacy_sql = True + job_config.dry_run = True + rows_iter = client.query_rows(QUERY, job_id=JOB, job_config=job_config) + + self.assertIsInstance(rows_iter, HTTPIterator) + self.assertEqual(len(conn._requested), 2) + req = conn._requested[0] + configuration = req['data']['configuration'] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/projects/PROJECT/jobs') + self.assertEqual(req['data']['jobReference']['jobId'], JOB) + self.assertEqual(configuration['query']['useLegacySql'], True) + self.assertEqual(configuration['dryRun'], True) class _Connection(object):