diff --git a/google/cloud/bigquery/_job_helpers.py b/google/cloud/bigquery/_job_helpers.py new file mode 100644 index 000000000..33fc72261 --- /dev/null +++ b/google/cloud/bigquery/_job_helpers.py @@ -0,0 +1,259 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for interacting with the job REST APIs from the client.""" + +import copy +import uuid +from typing import Any, Dict, TYPE_CHECKING, Optional + +import google.api_core.exceptions as core_exceptions +from google.api_core import retry as retries + +from google.cloud.bigquery import job + +# Avoid circular imports +if TYPE_CHECKING: # pragma: NO COVER + from google.cloud.bigquery.client import Client + + +# The purpose of _TIMEOUT_BUFFER_MILLIS is to allow the server-side timeout to +# happen before the client-side timeout. This is not strictly neccessary, as the +# client retries client-side timeouts, but the hope by making the server-side +# timeout slightly shorter is that it can save the server from some unncessary +# processing time. +# +# 250 milliseconds is chosen arbitrarily, though should be about the right +# order of magnitude for network latency and switching delays. It is about the +# amount of time for light to circumnavigate the world twice. +_TIMEOUT_BUFFER_MILLIS = 250 + + +def make_job_id(job_id: Optional[str] = None, prefix: Optional[str] = None) -> str: + """Construct an ID for a new job. + + Args: + job_id: the user-provided job ID. + prefix: the user-provided prefix for a job ID. + + Returns: + str: A job ID + """ + if job_id is not None: + return job_id + elif prefix is not None: + return str(prefix) + str(uuid.uuid4()) + else: + return str(uuid.uuid4()) + + +def query_jobs_insert( + client: "Client", + query: str, + job_config: Optional[job.QueryJobConfig], + job_id: Optional[str], + job_id_prefix: Optional[str], + location: str, + project: str, + retry: retries.Retry, + timeout: Optional[float], + job_retry: retries.Retry, +) -> job.QueryJob: + """Initiate a query using jobs.insert. + + See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert + """ + job_id_given = job_id is not None + job_id_save = job_id + job_config_save = job_config + + def do_query(): + # Make a copy now, so that original doesn't get changed by the process + # below and to facilitate retry + job_config = copy.deepcopy(job_config_save) + + job_id = make_job_id(job_id_save, job_id_prefix) + job_ref = job._JobReference(job_id, project=project, location=location) + query_job = job.QueryJob(job_ref, query, client=client, job_config=job_config) + + try: + query_job._begin(retry=retry, timeout=timeout) + except core_exceptions.Conflict as create_exc: + # The thought is if someone is providing their own job IDs and they get + # their job ID generation wrong, this could end up returning results for + # the wrong query. We thus only try to recover if job ID was not given. + if job_id_given: + raise create_exc + + try: + query_job = client.get_job( + job_id, + project=project, + location=location, + retry=retry, + timeout=timeout, + ) + except core_exceptions.GoogleAPIError: # (includes RetryError) + raise create_exc + else: + return query_job + else: + return query_job + + future = do_query() + # The future might be in a failed state now, but if it's + # unrecoverable, we'll find out when we ask for it's result, at which + # point, we may retry. + if not job_id_given: + future._retry_do_query = do_query # in case we have to retry later + future._job_retry = job_retry + + return future + + +def _to_query_request(job_config: Optional[job.QueryJobConfig]) -> Dict[str, Any]: + """Transform from Job resource to QueryRequest resource. + + Most of the keys in job.configuration.query are in common with + QueryRequest. If any configuration property is set that is not available in + jobs.query, it will result in a server-side error. + """ + request_body = {} + job_config_resource = job_config.to_api_repr() if job_config else {} + query_config_resource = job_config_resource.get("query", {}) + + request_body.update(query_config_resource) + + # These keys are top level in job resource and query resource. + if "labels" in job_config_resource: + request_body["labels"] = job_config_resource["labels"] + if "dryRun" in job_config_resource: + request_body["dryRun"] = job_config_resource["dryRun"] + + # Default to standard SQL. + request_body.setdefault("useLegacySql", False) + + # Since jobs.query can return results, ensure we use the lossless timestamp + # format. See: https://github.com/googleapis/python-bigquery/issues/395 + request_body.setdefault("formatOptions", {}) + request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore + + return request_body + + +def _to_query_job( + client: "Client", + query: str, + request_config: Optional[job.QueryJobConfig], + query_response: Dict[str, Any], +) -> job.QueryJob: + job_ref_resource = query_response["jobReference"] + job_ref = job._JobReference._from_api_repr(job_ref_resource) + query_job = job.QueryJob(job_ref, query, client=client) + query_job._properties.setdefault("configuration", {}) + + # Not all relevant properties are in the jobs.query response. Populate some + # expected properties based on the job configuration. + if request_config is not None: + query_job._properties["configuration"].update(request_config.to_api_repr()) + + query_job._properties["configuration"].setdefault("query", {}) + query_job._properties["configuration"]["query"]["query"] = query + query_job._properties["configuration"]["query"].setdefault("useLegacySql", False) + + query_job._properties.setdefault("statistics", {}) + query_job._properties["statistics"].setdefault("query", {}) + query_job._properties["statistics"]["query"]["cacheHit"] = query_response.get( + "cacheHit" + ) + query_job._properties["statistics"]["query"]["schema"] = query_response.get( + "schema" + ) + query_job._properties["statistics"]["query"][ + "totalBytesProcessed" + ] = query_response.get("totalBytesProcessed") + + # Set errors if any were encountered. + query_job._properties.setdefault("status", {}) + if "errors" in query_response: + # Set errors but not errorResult. If there was an error that failed + # the job, jobs.query behaves like jobs.getQueryResults and returns a + # non-success HTTP status code. + errors = query_response["errors"] + query_job._properties["status"]["errors"] = errors + + # Transform job state so that QueryJob doesn't try to restart the query. + job_complete = query_response.get("jobComplete") + if job_complete: + query_job._properties["status"]["state"] = "DONE" + # TODO: https://github.com/googleapis/python-bigquery/issues/589 + # Set the first page of results if job is "complete" and there is + # only 1 page of results. Otherwise, use the existing logic that + # refreshes the job stats. + # + # This also requires updates to `to_dataframe` and the DB API connector + # so that they don't try to read from a destination table if all the + # results are present. + else: + query_job._properties["status"]["state"] = "PENDING" + + return query_job + + +def query_jobs_query( + client: "Client", + query: str, + job_config: Optional[job.QueryJobConfig], + location: str, + project: str, + retry: retries.Retry, + timeout: Optional[float], + job_retry: retries.Retry, +) -> job.QueryJob: + """Initiate a query using jobs.query. + + See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query + """ + path = f"/projects/{project}/queries" + request_body = _to_query_request(job_config) + + if timeout is not None: + # Subtract a buffer for context switching, network latency, etc. + request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS) + request_body["location"] = location + request_body["query"] = query + + def do_query(): + request_body["requestId"] = make_job_id() + span_attributes = {"path": path} + api_response = client._call_api( + retry, + span_name="BigQuery.query", + span_attributes=span_attributes, + method="POST", + path=path, + data=request_body, + timeout=timeout, + ) + return _to_query_job(client, query, job_config, api_response) + + future = do_query() + + # The future might be in a failed state now, but if it's + # unrecoverable, we'll find out when we ask for it's result, at which + # point, we may retry. + future._retry_do_query = do_query # in case we have to retry later + future._job_retry = job_retry + + return future diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 6d94f5bbc..76ccafaf4 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -60,6 +60,8 @@ DEFAULT_CLIENT_INFO as DEFAULT_BQSTORAGE_CLIENT_INFO, ) +from google.cloud.bigquery import _job_helpers +from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id from google.cloud.bigquery._helpers import _get_sub_prop from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none @@ -69,6 +71,7 @@ from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetListItem from google.cloud.bigquery.dataset import DatasetReference +from google.cloud.bigquery import enums from google.cloud.bigquery.enums import AutoRowIDs from google.cloud.bigquery.opentelemetry_tracing import create_span from google.cloud.bigquery import job @@ -3164,6 +3167,7 @@ def query( retry: retries.Retry = DEFAULT_RETRY, timeout: TimeoutType = DEFAULT_TIMEOUT, job_retry: retries.Retry = DEFAULT_JOB_RETRY, + api_method: Union[str, enums.QueryApiMethod] = enums.QueryApiMethod.INSERT, ) -> job.QueryJob: """Run a SQL query. @@ -3215,6 +3219,11 @@ def query( called on the job returned. The ``job_retry`` specified here becomes the default ``job_retry`` for ``result()``, where it can also be specified. + api_method (Union[str, enums.QueryApiMethod]): + Method with which to start the query job. + + See :class:`google.cloud.bigquery.enums.QueryApiMethod` for + details on the difference between the query start methods. Returns: google.cloud.bigquery.job.QueryJob: A new query job instance. @@ -3238,7 +3247,10 @@ def query( " provided." ) - job_id_save = job_id + if job_id_given and api_method == enums.QueryApiMethod.QUERY: + raise TypeError( + "`job_id` was provided, but the 'QUERY' `api_method` was requested." + ) if project is None: project = self.project @@ -3269,50 +3281,25 @@ def query( # Note that we haven't modified the original job_config (or # _default_query_job_config) up to this point. - job_config_save = job_config - - def do_query(): - # Make a copy now, so that original doesn't get changed by the process - # below and to facilitate retry - job_config = copy.deepcopy(job_config_save) - - job_id = _make_job_id(job_id_save, job_id_prefix) - job_ref = job._JobReference(job_id, project=project, location=location) - query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config) - - try: - query_job._begin(retry=retry, timeout=timeout) - except core_exceptions.Conflict as create_exc: - # The thought is if someone is providing their own job IDs and they get - # their job ID generation wrong, this could end up returning results for - # the wrong query. We thus only try to recover if job ID was not given. - if job_id_given: - raise create_exc - - try: - query_job = self.get_job( - job_id, - project=project, - location=location, - retry=retry, - timeout=timeout, - ) - except core_exceptions.GoogleAPIError: # (includes RetryError) - raise create_exc - else: - return query_job - else: - return query_job - - future = do_query() - # The future might be in a failed state now, but if it's - # unrecoverable, we'll find out when we ask for it's result, at which - # point, we may retry. - if not job_id_given: - future._retry_do_query = do_query # in case we have to retry later - future._job_retry = job_retry - - return future + if api_method == enums.QueryApiMethod.QUERY: + return _job_helpers.query_jobs_query( + self, query, job_config, location, project, retry, timeout, job_retry, + ) + elif api_method == enums.QueryApiMethod.INSERT: + return _job_helpers.query_jobs_insert( + self, + query, + job_config, + job_id, + job_id_prefix, + location, + project, + retry, + timeout, + job_retry, + ) + else: + raise ValueError(f"Got unexpected value for api_method: {repr(api_method)}") def insert_rows( self, @@ -3985,24 +3972,6 @@ def _extract_job_reference(job, project=None, location=None): return (project, location, job_id) -def _make_job_id(job_id: Optional[str], prefix: Optional[str] = None) -> str: - """Construct an ID for a new job. - - Args: - job_id: the user-provided job ID. - prefix: the user-provided prefix for a job ID. - - Returns: - str: A job ID - """ - if job_id is not None: - return job_id - elif prefix is not None: - return str(prefix) + str(uuid.uuid4()) - else: - return str(uuid.uuid4()) - - def _check_mode(stream): """Check that a stream was opened in read-binary mode. diff --git a/google/cloud/bigquery/enums.py b/google/cloud/bigquery/enums.py index 8c24f71e7..c4a43126a 100644 --- a/google/cloud/bigquery/enums.py +++ b/google/cloud/bigquery/enums.py @@ -122,6 +122,45 @@ class QueryPriority(object): """Specifies batch priority.""" +class QueryApiMethod(str, enum.Enum): + """API method used to start the query. The default value is + :attr:`INSERT`. + """ + + INSERT = "INSERT" + """Submit a query job by using the `jobs.insert REST API method + `_. + + This supports all job configuration options. + """ + + QUERY = "QUERY" + """Submit a query job by using the `jobs.query REST API method + `_. + + Differences from ``INSERT``: + + * Many parameters and job configuration options, including job ID and + destination table, cannot be used + with this API method. See the `jobs.query REST API documentation + `_ for + the complete list of supported configuration options. + + * API blocks up to a specified timeout, waiting for the query to + finish. + + * The full job resource (including job statistics) may not be available. + Call :meth:`~google.cloud.bigquery.job.QueryJob.reload` or + :meth:`~google.cloud.bigquery.client.Client.get_job` to get full job + statistics and configuration. + + * :meth:`~google.cloud.bigquery.Client.query` can raise API exceptions if + the query fails, whereas the same errors don't appear until calling + :meth:`~google.cloud.bigquery.job.QueryJob.result` when the ``INSERT`` + API method is used. + """ + + class SchemaUpdateOption(object): """Specifies an update to the destination table schema as a side effect of a load job. diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 7eec76a32..784a1dd5c 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -13,7 +13,9 @@ # limitations under the License. import pathlib +import random import re +from typing import Tuple import pytest import test_utils.prefixer @@ -26,6 +28,7 @@ prefixer = test_utils.prefixer.Prefixer("python-bigquery", "tests/system") DATA_DIR = pathlib.Path(__file__).parent.parent / "data" +TOKYO_LOCATION = "asia-northeast1" @pytest.fixture(scope="session", autouse=True) @@ -62,6 +65,16 @@ def dataset_id(bigquery_client): bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) +@pytest.fixture(scope="session") +def dataset_id_tokyo(bigquery_client: bigquery.Client, project_id: str): + dataset_id = prefixer.create_prefix() + "_tokyo" + dataset = bigquery.Dataset(f"{project_id}.{dataset_id}") + dataset.location = TOKYO_LOCATION + bigquery_client.create_dataset(dataset) + yield dataset_id + bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) + + @pytest.fixture() def dataset_client(bigquery_client, dataset_id): import google.cloud.bigquery.job @@ -78,38 +91,64 @@ def table_id(dataset_id): return f"{dataset_id}.table_{helpers.temp_suffix()}" -@pytest.fixture(scope="session") -def scalars_table(bigquery_client: bigquery.Client, project_id: str, dataset_id: str): +def load_scalars_table( + bigquery_client: bigquery.Client, + project_id: str, + dataset_id: str, + data_path: str = "scalars.jsonl", +) -> str: schema = bigquery_client.schema_from_json(DATA_DIR / "scalars_schema.json") + table_id = data_path.replace(".", "_") + hex(random.randrange(1000000)) job_config = bigquery.LoadJobConfig() job_config.schema = schema job_config.source_format = enums.SourceFormat.NEWLINE_DELIMITED_JSON - full_table_id = f"{project_id}.{dataset_id}.scalars" - with open(DATA_DIR / "scalars.jsonl", "rb") as data_file: + full_table_id = f"{project_id}.{dataset_id}.{table_id}" + with open(DATA_DIR / data_path, "rb") as data_file: job = bigquery_client.load_table_from_file( data_file, full_table_id, job_config=job_config ) job.result() + return full_table_id + + +@pytest.fixture(scope="session") +def scalars_table(bigquery_client: bigquery.Client, project_id: str, dataset_id: str): + full_table_id = load_scalars_table(bigquery_client, project_id, dataset_id) yield full_table_id - bigquery_client.delete_table(full_table_id) + bigquery_client.delete_table(full_table_id, not_found_ok=True) + + +@pytest.fixture(scope="session") +def scalars_table_tokyo( + bigquery_client: bigquery.Client, project_id: str, dataset_id_tokyo: str +): + full_table_id = load_scalars_table(bigquery_client, project_id, dataset_id_tokyo) + yield full_table_id + bigquery_client.delete_table(full_table_id, not_found_ok=True) @pytest.fixture(scope="session") def scalars_extreme_table( bigquery_client: bigquery.Client, project_id: str, dataset_id: str ): - schema = bigquery_client.schema_from_json(DATA_DIR / "scalars_schema.json") - job_config = bigquery.LoadJobConfig() - job_config.schema = schema - job_config.source_format = enums.SourceFormat.NEWLINE_DELIMITED_JSON - full_table_id = f"{project_id}.{dataset_id}.scalars_extreme" - with open(DATA_DIR / "scalars_extreme.jsonl", "rb") as data_file: - job = bigquery_client.load_table_from_file( - data_file, full_table_id, job_config=job_config - ) - job.result() + full_table_id = load_scalars_table( + bigquery_client, project_id, dataset_id, data_path="scalars_extreme.jsonl" + ) yield full_table_id - bigquery_client.delete_table(full_table_id) + bigquery_client.delete_table(full_table_id, not_found_ok=True) + + +@pytest.fixture(scope="session", params=["US", TOKYO_LOCATION]) +def scalars_table_multi_location( + request, scalars_table: str, scalars_table_tokyo: str +) -> Tuple[str, str]: + if request.param == "US": + full_table_id = scalars_table + elif request.param == TOKYO_LOCATION: + full_table_id = scalars_table_tokyo + else: + raise ValueError(f"got unexpected location: {request.param}") + return request.param, full_table_id @pytest.fixture diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 8059f21db..8f28d5a8b 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -13,7 +13,6 @@ # limitations under the License. import base64 -import concurrent.futures import csv import datetime import decimal @@ -692,64 +691,6 @@ def _fetch_single_page(table, selected_fields=None): page = next(iterator.pages) return list(page) - def _create_table_many_columns(self, rowcount): - # Generate a table of maximum width via CREATE TABLE AS SELECT. - # first column is named 'rowval', and has a value from 1..rowcount - # Subsequent column is named col_ and contains the value N*rowval, - # where N is between 1 and 9999 inclusive. - dsname = _make_dataset_id("wide_schema") - dataset = self.temp_dataset(dsname) - table_id = "many_columns" - table_ref = dataset.table(table_id) - self.to_delete.insert(0, table_ref) - colprojections = ",".join( - ["r * {} as col_{}".format(n, n) for n in range(1, 10000)] - ) - sql = """ - CREATE TABLE {}.{} - AS - SELECT - r as rowval, - {} - FROM - UNNEST(GENERATE_ARRAY(1,{},1)) as r - """.format( - dsname, table_id, colprojections, rowcount - ) - query_job = Config.CLIENT.query(sql) - query_job.result() - self.assertEqual(query_job.statement_type, "CREATE_TABLE_AS_SELECT") - self.assertEqual(query_job.ddl_operation_performed, "CREATE") - self.assertEqual(query_job.ddl_target_table, table_ref) - - return table_ref - - def test_query_many_columns(self): - # Test working with the widest schema BigQuery supports, 10k columns. - row_count = 2 - table_ref = self._create_table_many_columns(row_count) - rows = list( - Config.CLIENT.query( - "SELECT * FROM `{}.{}`".format(table_ref.dataset_id, table_ref.table_id) - ) - ) - - self.assertEqual(len(rows), row_count) - - # check field representations adhere to expected values. - correctwidth = 0 - badvals = 0 - for r in rows: - vals = r._xxx_values - rowval = vals[0] - if len(vals) == 10000: - correctwidth = correctwidth + 1 - for n in range(1, 10000): - if vals[n] != rowval * (n): - badvals = badvals + 1 - self.assertEqual(correctwidth, row_count) - self.assertEqual(badvals, 0) - def test_insert_rows_then_dump_table(self): NOW_SECONDS = 1448911495.484366 NOW = datetime.datetime.utcfromtimestamp(NOW_SECONDS).replace(tzinfo=UTC) @@ -1368,25 +1309,6 @@ def test_query_w_wrong_config(self): with self.assertRaises(Exception): Config.CLIENT.query(good_query, job_config=bad_config).result() - def test_query_w_timeout(self): - job_config = bigquery.QueryJobConfig() - job_config.use_query_cache = False - - query_job = Config.CLIENT.query( - "SELECT * FROM `bigquery-public-data.github_repos.commits`;", - job_id_prefix="test_query_w_timeout_", - location="US", - job_config=job_config, - ) - - with self.assertRaises(concurrent.futures.TimeoutError): - query_job.result(timeout=1) - - # Even though the query takes >1 second, the call to getQueryResults - # should succeed. - self.assertFalse(query_job.done(timeout=1)) - self.assertIsNotNone(Config.CLIENT.cancel_job(query_job)) - def test_query_w_page_size(self): page_size = 45 query_job = Config.CLIENT.query( @@ -1408,83 +1330,6 @@ def test_query_w_start_index(self): self.assertEqual(result1.extra_params["startIndex"], start_index) self.assertEqual(len(list(result1)), total_rows - start_index) - def test_query_statistics(self): - """ - A system test to exercise some of the extended query statistics. - - Note: We construct a query that should need at least three stages by - specifying a JOIN query. Exact plan and stats are effectively - non-deterministic, so we're largely interested in confirming values - are present. - """ - - job_config = bigquery.QueryJobConfig() - job_config.use_query_cache = False - - query_job = Config.CLIENT.query( - """ - SELECT - COUNT(1) - FROM - ( - SELECT - year, - wban_number - FROM `bigquery-public-data.samples.gsod` - LIMIT 1000 - ) lside - INNER JOIN - ( - SELECT - year, - state - FROM `bigquery-public-data.samples.natality` - LIMIT 1000 - ) rside - ON - lside.year = rside.year - """, - location="US", - job_config=job_config, - ) - - # run the job to completion - query_job.result() - - # Assert top-level stats - self.assertFalse(query_job.cache_hit) - self.assertIsNotNone(query_job.destination) - self.assertTrue(query_job.done) - self.assertFalse(query_job.dry_run) - self.assertIsNone(query_job.num_dml_affected_rows) - self.assertEqual(query_job.priority, "INTERACTIVE") - self.assertGreater(query_job.total_bytes_billed, 1) - self.assertGreater(query_job.total_bytes_processed, 1) - self.assertEqual(query_job.statement_type, "SELECT") - self.assertGreater(query_job.slot_millis, 1) - - # Make assertions on the shape of the query plan. - plan = query_job.query_plan - self.assertGreaterEqual(len(plan), 3) - first_stage = plan[0] - self.assertIsNotNone(first_stage.start) - self.assertIsNotNone(first_stage.end) - self.assertIsNotNone(first_stage.entry_id) - self.assertIsNotNone(first_stage.name) - self.assertGreater(first_stage.parallel_inputs, 0) - self.assertGreater(first_stage.completed_parallel_inputs, 0) - self.assertGreater(first_stage.shuffle_output_bytes, 0) - self.assertEqual(first_stage.status, "COMPLETE") - - # Query plan is a digraph. Ensure it has inter-stage links, - # but not every stage has inputs. - stages_with_inputs = 0 - for entry in plan: - if len(entry.input_stages) > 0: - stages_with_inputs = stages_with_inputs + 1 - self.assertGreater(stages_with_inputs, 0) - self.assertGreater(len(plan), stages_with_inputs) - def test_dml_statistics(self): table_schema = ( bigquery.SchemaField("foo", "STRING"), @@ -1774,212 +1619,6 @@ def test_dbapi_w_dml(self): ) self.assertEqual(Config.CURSOR.rowcount, 1) - def test_query_w_query_params(self): - from google.cloud.bigquery.job import QueryJobConfig - from google.cloud.bigquery.query import ArrayQueryParameter - from google.cloud.bigquery.query import ScalarQueryParameter - from google.cloud.bigquery.query import ScalarQueryParameterType - from google.cloud.bigquery.query import StructQueryParameter - from google.cloud.bigquery.query import StructQueryParameterType - - question = "What is the answer to life, the universe, and everything?" - question_param = ScalarQueryParameter( - name="question", type_="STRING", value=question - ) - answer = 42 - answer_param = ScalarQueryParameter(name="answer", type_="INT64", value=answer) - pi = 3.1415926 - pi_param = ScalarQueryParameter(name="pi", type_="FLOAT64", value=pi) - pi_numeric = decimal.Decimal("3.141592654") - pi_numeric_param = ScalarQueryParameter( - name="pi_numeric_param", type_="NUMERIC", value=pi_numeric - ) - bignum = decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)) - bignum_param = ScalarQueryParameter( - name="bignum_param", type_="BIGNUMERIC", value=bignum - ) - truthy = True - truthy_param = ScalarQueryParameter(name="truthy", type_="BOOL", value=truthy) - beef = b"DEADBEEF" - beef_param = ScalarQueryParameter(name="beef", type_="BYTES", value=beef) - naive = datetime.datetime(2016, 12, 5, 12, 41, 9) - naive_param = ScalarQueryParameter(name="naive", type_="DATETIME", value=naive) - naive_date_param = ScalarQueryParameter( - name="naive_date", type_="DATE", value=naive.date() - ) - naive_time_param = ScalarQueryParameter( - name="naive_time", type_="TIME", value=naive.time() - ) - zoned = naive.replace(tzinfo=UTC) - zoned_param = ScalarQueryParameter(name="zoned", type_="TIMESTAMP", value=zoned) - array_param = ArrayQueryParameter( - name="array_param", array_type="INT64", values=[1, 2] - ) - struct_param = StructQueryParameter("hitchhiker", question_param, answer_param) - phred_name = "Phred Phlyntstone" - phred_name_param = ScalarQueryParameter( - name="name", type_="STRING", value=phred_name - ) - phred_age = 32 - phred_age_param = ScalarQueryParameter( - name="age", type_="INT64", value=phred_age - ) - phred_param = StructQueryParameter(None, phred_name_param, phred_age_param) - bharney_name = "Bharney Rhubbyl" - bharney_name_param = ScalarQueryParameter( - name="name", type_="STRING", value=bharney_name - ) - bharney_age = 31 - bharney_age_param = ScalarQueryParameter( - name="age", type_="INT64", value=bharney_age - ) - bharney_param = StructQueryParameter( - None, bharney_name_param, bharney_age_param - ) - characters_param = ArrayQueryParameter( - name=None, array_type="RECORD", values=[phred_param, bharney_param] - ) - empty_struct_array_param = ArrayQueryParameter( - name="empty_array_param", - values=[], - array_type=StructQueryParameterType( - ScalarQueryParameterType(name="foo", type_="INT64"), - ScalarQueryParameterType(name="bar", type_="STRING"), - ), - ) - hero_param = StructQueryParameter("hero", phred_name_param, phred_age_param) - sidekick_param = StructQueryParameter( - "sidekick", bharney_name_param, bharney_age_param - ) - roles_param = StructQueryParameter("roles", hero_param, sidekick_param) - friends_param = ArrayQueryParameter( - name="friends", array_type="STRING", values=[phred_name, bharney_name] - ) - with_friends_param = StructQueryParameter(None, friends_param) - top_left_param = StructQueryParameter( - "top_left", - ScalarQueryParameter("x", "INT64", 12), - ScalarQueryParameter("y", "INT64", 102), - ) - bottom_right_param = StructQueryParameter( - "bottom_right", - ScalarQueryParameter("x", "INT64", 22), - ScalarQueryParameter("y", "INT64", 92), - ) - rectangle_param = StructQueryParameter( - "rectangle", top_left_param, bottom_right_param - ) - examples = [ - { - "sql": "SELECT @question", - "expected": question, - "query_parameters": [question_param], - }, - { - "sql": "SELECT @answer", - "expected": answer, - "query_parameters": [answer_param], - }, - {"sql": "SELECT @pi", "expected": pi, "query_parameters": [pi_param]}, - { - "sql": "SELECT @pi_numeric_param", - "expected": pi_numeric, - "query_parameters": [pi_numeric_param], - }, - { - "sql": "SELECT @bignum_param", - "expected": bignum, - "query_parameters": [bignum_param], - }, - { - "sql": "SELECT @truthy", - "expected": truthy, - "query_parameters": [truthy_param], - }, - {"sql": "SELECT @beef", "expected": beef, "query_parameters": [beef_param]}, - { - "sql": "SELECT @naive", - "expected": naive, - "query_parameters": [naive_param], - }, - { - "sql": "SELECT @naive_date", - "expected": naive.date(), - "query_parameters": [naive_date_param], - }, - { - "sql": "SELECT @naive_time", - "expected": naive.time(), - "query_parameters": [naive_time_param], - }, - { - "sql": "SELECT @zoned", - "expected": zoned, - "query_parameters": [zoned_param], - }, - { - "sql": "SELECT @array_param", - "expected": [1, 2], - "query_parameters": [array_param], - }, - { - "sql": "SELECT (@hitchhiker.question, @hitchhiker.answer)", - "expected": ({"_field_1": question, "_field_2": answer}), - "query_parameters": [struct_param], - }, - { - "sql": "SELECT " - "((@rectangle.bottom_right.x - @rectangle.top_left.x) " - "* (@rectangle.top_left.y - @rectangle.bottom_right.y))", - "expected": 100, - "query_parameters": [rectangle_param], - }, - { - "sql": "SELECT ?", - "expected": [ - {"name": phred_name, "age": phred_age}, - {"name": bharney_name, "age": bharney_age}, - ], - "query_parameters": [characters_param], - }, - { - "sql": "SELECT @empty_array_param", - "expected": [], - "query_parameters": [empty_struct_array_param], - }, - { - "sql": "SELECT @roles", - "expected": { - "hero": {"name": phred_name, "age": phred_age}, - "sidekick": {"name": bharney_name, "age": bharney_age}, - }, - "query_parameters": [roles_param], - }, - { - "sql": "SELECT ?", - "expected": {"friends": [phred_name, bharney_name]}, - "query_parameters": [with_friends_param], - }, - { - "sql": "SELECT @bignum_param", - "expected": bignum, - "query_parameters": [bignum_param], - }, - ] - - for example in examples: - jconfig = QueryJobConfig() - jconfig.query_parameters = example["query_parameters"] - query_job = Config.CLIENT.query( - example["sql"], - job_config=jconfig, - job_id_prefix="test_query_w_query_params", - ) - rows = list(query_job.result()) - self.assertEqual(len(rows), 1) - self.assertEqual(len(rows[0]), 1) - self.assertEqual(rows[0][0], example["expected"]) - def test_dbapi_w_query_parameters(self): examples = [ { diff --git a/tests/system/test_query.py b/tests/system/test_query.py index 649120a7e..f76b1e6ca 100644 --- a/tests/system/test_query.py +++ b/tests/system/test_query.py @@ -12,15 +12,430 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures +import datetime +import decimal +from typing import Tuple + +from google.api_core import exceptions +import pytest + from google.cloud import bigquery +from google.cloud.bigquery.query import ArrayQueryParameter +from google.cloud.bigquery.query import ScalarQueryParameter +from google.cloud.bigquery.query import ScalarQueryParameterType +from google.cloud.bigquery.query import StructQueryParameter +from google.cloud.bigquery.query import StructQueryParameterType + + +@pytest.fixture(params=["INSERT", "QUERY"]) +def query_api_method(request): + return request.param -def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str): +@pytest.fixture(scope="session") +def table_with_9999_columns_10_rows(bigquery_client, project_id, dataset_id): + """Generate a table of maximum width via CREATE TABLE AS SELECT. + + The first column is named 'rowval', and has a value from 1..rowcount + Subsequent columns are named col_ and contain the value N*rowval, where + N is between 1 and 9999 inclusive. + """ + table_id = "many_columns" + row_count = 10 + col_projections = ",".join(f"r * {n} as col_{n}" for n in range(1, 10000)) + sql = f""" + CREATE TABLE `{project_id}.{dataset_id}.{table_id}` + AS + SELECT + r as rowval, + {col_projections} + FROM + UNNEST(GENERATE_ARRAY(1,{row_count},1)) as r + """ + query_job = bigquery_client.query(sql) + query_job.result() + + return f"{project_id}.{dataset_id}.{table_id}" + + +def test_query_many_columns( + bigquery_client, table_with_9999_columns_10_rows, query_api_method +): + # Test working with the widest schema BigQuery supports, 10k columns. + query_job = bigquery_client.query( + f"SELECT * FROM `{table_with_9999_columns_10_rows}`", + api_method=query_api_method, + ) + rows = list(query_job) + assert len(rows) == 10 + + # check field representations adhere to expected values. + for row in rows: + rowval = row["rowval"] + for column in range(1, 10000): + assert row[f"col_{column}"] == rowval * column + + +def test_query_w_timeout(bigquery_client, query_api_method): + job_config = bigquery.QueryJobConfig() + job_config.use_query_cache = False + + query_job = bigquery_client.query( + "SELECT * FROM `bigquery-public-data.github_repos.commits`;", + location="US", + job_config=job_config, + api_method=query_api_method, + ) + + with pytest.raises(concurrent.futures.TimeoutError): + query_job.result(timeout=1) + + # Even though the query takes >1 second, the call to getQueryResults + # should succeed. + assert not query_job.done(timeout=1) + assert bigquery_client.cancel_job(query_job) is not None + + +def test_query_statistics(bigquery_client, query_api_method): + """ + A system test to exercise some of the extended query statistics. + + Note: We construct a query that should need at least three stages by + specifying a JOIN query. Exact plan and stats are effectively + non-deterministic, so we're largely interested in confirming values + are present. + """ + + job_config = bigquery.QueryJobConfig() + job_config.use_query_cache = False + + query_job = bigquery_client.query( + """ + SELECT + COUNT(1) + FROM + ( + SELECT + year, + wban_number + FROM `bigquery-public-data.samples.gsod` + LIMIT 1000 + ) lside + INNER JOIN + ( + SELECT + year, + state + FROM `bigquery-public-data.samples.natality` + LIMIT 1000 + ) rside + ON + lside.year = rside.year + """, + location="US", + job_config=job_config, + api_method=query_api_method, + ) + + # run the job to completion + query_job.result() + + # Must reload job to get stats if jobs.query was used. + if query_api_method == "QUERY": + query_job.reload() + + # Assert top-level stats + assert not query_job.cache_hit + assert query_job.destination is not None + assert query_job.done + assert not query_job.dry_run + assert query_job.num_dml_affected_rows is None + assert query_job.priority == "INTERACTIVE" + assert query_job.total_bytes_billed > 1 + assert query_job.total_bytes_processed > 1 + assert query_job.statement_type == "SELECT" + assert query_job.slot_millis > 1 + + # Make assertions on the shape of the query plan. + plan = query_job.query_plan + assert len(plan) >= 3 + first_stage = plan[0] + assert first_stage.start is not None + assert first_stage.end is not None + assert first_stage.entry_id is not None + assert first_stage.name is not None + assert first_stage.parallel_inputs > 0 + assert first_stage.completed_parallel_inputs > 0 + assert first_stage.shuffle_output_bytes > 0 + assert first_stage.status == "COMPLETE" + + # Query plan is a digraph. Ensure it has inter-stage links, + # but not every stage has inputs. + stages_with_inputs = 0 + for entry in plan: + if len(entry.input_stages) > 0: + stages_with_inputs = stages_with_inputs + 1 + assert stages_with_inputs > 0 + assert len(plan) > stages_with_inputs + + +@pytest.mark.parametrize( + ("sql", "expected", "query_parameters"), + ( + ( + "SELECT @question", + "What is the answer to life, the universe, and everything?", + [ + ScalarQueryParameter( + name="question", + type_="STRING", + value="What is the answer to life, the universe, and everything?", + ) + ], + ), + ( + "SELECT @answer", + 42, + [ScalarQueryParameter(name="answer", type_="INT64", value=42)], + ), + ( + "SELECT @pi", + 3.1415926, + [ScalarQueryParameter(name="pi", type_="FLOAT64", value=3.1415926)], + ), + ( + "SELECT @pi_numeric_param", + decimal.Decimal("3.141592654"), + [ + ScalarQueryParameter( + name="pi_numeric_param", + type_="NUMERIC", + value=decimal.Decimal("3.141592654"), + ) + ], + ), + ( + "SELECT @bignum_param", + decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)), + [ + ScalarQueryParameter( + name="bignum_param", + type_="BIGNUMERIC", + value=decimal.Decimal("-{d38}.{d38}".format(d38="9" * 38)), + ) + ], + ), + ( + "SELECT @truthy", + True, + [ScalarQueryParameter(name="truthy", type_="BOOL", value=True)], + ), + ( + "SELECT @beef", + b"DEADBEEF", + [ScalarQueryParameter(name="beef", type_="BYTES", value=b"DEADBEEF")], + ), + ( + "SELECT @naive", + datetime.datetime(2016, 12, 5, 12, 41, 9), + [ + ScalarQueryParameter( + name="naive", + type_="DATETIME", + value=datetime.datetime(2016, 12, 5, 12, 41, 9), + ) + ], + ), + ( + "SELECT @naive_date", + datetime.date(2016, 12, 5), + [ + ScalarQueryParameter( + name="naive_date", type_="DATE", value=datetime.date(2016, 12, 5) + ) + ], + ), + ( + "SELECT @naive_time", + datetime.time(12, 41, 9, 62500), + [ + ScalarQueryParameter( + name="naive_time", + type_="TIME", + value=datetime.time(12, 41, 9, 62500), + ) + ], + ), + ( + "SELECT @zoned", + datetime.datetime(2016, 12, 5, 12, 41, 9, tzinfo=datetime.timezone.utc), + [ + ScalarQueryParameter( + name="zoned", + type_="TIMESTAMP", + value=datetime.datetime( + 2016, 12, 5, 12, 41, 9, tzinfo=datetime.timezone.utc + ), + ) + ], + ), + ( + "SELECT @array_param", + [1, 2], + [ + ArrayQueryParameter( + name="array_param", array_type="INT64", values=[1, 2] + ) + ], + ), + ( + "SELECT (@hitchhiker.question, @hitchhiker.answer)", + ({"_field_1": "What is the answer?", "_field_2": 42}), + [ + StructQueryParameter( + "hitchhiker", + ScalarQueryParameter( + name="question", type_="STRING", value="What is the answer?", + ), + ScalarQueryParameter(name="answer", type_="INT64", value=42,), + ), + ], + ), + ( + "SELECT " + "((@rectangle.bottom_right.x - @rectangle.top_left.x) " + "* (@rectangle.top_left.y - @rectangle.bottom_right.y))", + 100, + [ + StructQueryParameter( + "rectangle", + StructQueryParameter( + "top_left", + ScalarQueryParameter("x", "INT64", 12), + ScalarQueryParameter("y", "INT64", 102), + ), + StructQueryParameter( + "bottom_right", + ScalarQueryParameter("x", "INT64", 22), + ScalarQueryParameter("y", "INT64", 92), + ), + ) + ], + ), + ( + "SELECT ?", + [ + {"name": "Phred Phlyntstone", "age": 32}, + {"name": "Bharney Rhubbyl", "age": 31}, + ], + [ + ArrayQueryParameter( + name=None, + array_type="RECORD", + values=[ + StructQueryParameter( + None, + ScalarQueryParameter( + name="name", type_="STRING", value="Phred Phlyntstone" + ), + ScalarQueryParameter(name="age", type_="INT64", value=32), + ), + StructQueryParameter( + None, + ScalarQueryParameter( + name="name", type_="STRING", value="Bharney Rhubbyl" + ), + ScalarQueryParameter(name="age", type_="INT64", value=31), + ), + ], + ) + ], + ), + ( + "SELECT @empty_array_param", + [], + [ + ArrayQueryParameter( + name="empty_array_param", + values=[], + array_type=StructQueryParameterType( + ScalarQueryParameterType(name="foo", type_="INT64"), + ScalarQueryParameterType(name="bar", type_="STRING"), + ), + ) + ], + ), + ( + "SELECT @roles", + { + "hero": {"name": "Phred Phlyntstone", "age": 32}, + "sidekick": {"name": "Bharney Rhubbyl", "age": 31}, + }, + [ + StructQueryParameter( + "roles", + StructQueryParameter( + "hero", + ScalarQueryParameter( + name="name", type_="STRING", value="Phred Phlyntstone" + ), + ScalarQueryParameter(name="age", type_="INT64", value=32), + ), + StructQueryParameter( + "sidekick", + ScalarQueryParameter( + name="name", type_="STRING", value="Bharney Rhubbyl" + ), + ScalarQueryParameter(name="age", type_="INT64", value=31), + ), + ), + ], + ), + ( + "SELECT ?", + {"friends": ["Jack", "Jill"]}, + [ + StructQueryParameter( + None, + ArrayQueryParameter( + name="friends", array_type="STRING", values=["Jack", "Jill"] + ), + ) + ], + ), + ), +) +def test_query_parameters( + bigquery_client, query_api_method, sql, expected, query_parameters +): + jconfig = bigquery.QueryJobConfig() + jconfig.query_parameters = query_parameters + query_job = bigquery_client.query( + sql, job_config=jconfig, api_method=query_api_method, + ) + rows = list(query_job.result()) + assert len(rows) == 1 + assert len(rows[0]) == 1 + assert rows[0][0] == expected + + +def test_dry_run( + bigquery_client: bigquery.Client, + query_api_method: str, + scalars_table_multi_location: Tuple[str, str], +): + location, full_table_id = scalars_table_multi_location query_config = bigquery.QueryJobConfig() query_config.dry_run = True - query_string = f"SELECT * FROM {scalars_table}" - query_job = bigquery_client.query(query_string, job_config=query_config,) + query_string = f"SELECT * FROM {full_table_id}" + query_job = bigquery_client.query( + query_string, + location=location, + job_config=query_config, + api_method=query_api_method, + ) # Note: `query_job.result()` is not necessary on a dry run query. All # necessary information is returned in the initial response. @@ -29,7 +444,30 @@ def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str): assert len(query_job.schema) > 0 -def test_session(bigquery_client: bigquery.Client): +def test_query_error_w_api_method_query(bigquery_client: bigquery.Client): + """No job is returned from jobs.query if the query fails.""" + + with pytest.raises(exceptions.NotFound, match="not_a_real_dataset"): + bigquery_client.query( + "SELECT * FROM not_a_real_dataset.doesnt_exist", api_method="QUERY" + ) + + +def test_query_error_w_api_method_default(bigquery_client: bigquery.Client): + """Test that an exception is not thrown until fetching the results. + + For backwards compatibility, jobs.insert is the default API method. With + jobs.insert, a failed query job is "sucessfully" created. An exception is + thrown when fetching the results. + """ + + query_job = bigquery_client.query("SELECT * FROM not_a_real_dataset.doesnt_exist") + + with pytest.raises(exceptions.NotFound, match="not_a_real_dataset"): + query_job.result() + + +def test_session(bigquery_client: bigquery.Client, query_api_method: str): initial_config = bigquery.QueryJobConfig() initial_config.create_session = True initial_query = """ @@ -37,7 +475,9 @@ def test_session(bigquery_client: bigquery.Client): AS SELECT * FROM UNNEST([1, 2, 3, 4, 5]) AS id; """ - initial_job = bigquery_client.query(initial_query, job_config=initial_config) + initial_job = bigquery_client.query( + initial_query, job_config=initial_config, api_method=query_api_method + ) initial_job.result() session_id = initial_job.session_info.session_id assert session_id is not None diff --git a/tests/unit/test__job_helpers.py b/tests/unit/test__job_helpers.py new file mode 100644 index 000000000..63dde75e7 --- /dev/null +++ b/tests/unit/test__job_helpers.py @@ -0,0 +1,329 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Optional +from unittest import mock + +from google.api_core import retry as retries +import pytest + +from google.cloud.bigquery.client import Client +from google.cloud.bigquery import _job_helpers +from google.cloud.bigquery.job.query import QueryJob, QueryJobConfig +from google.cloud.bigquery.query import ConnectionProperty, ScalarQueryParameter + + +def make_query_request(additional_properties: Optional[Dict[str, Any]] = None): + request = {"useLegacySql": False, "formatOptions": {"useInt64Timestamp": True}} + if additional_properties is not None: + request.update(additional_properties) + return request + + +def make_query_response( + completed: bool = False, + job_id: str = "abcd-efg-hijk-lmnop", + location="US", + project_id="test-project", + errors=None, +) -> Dict[str, Any]: + response = { + "jobReference": { + "projectId": project_id, + "jobId": job_id, + "location": location, + }, + "jobComplete": completed, + } + if errors is not None: + response["errors"] = errors + return response + + +@pytest.mark.parametrize( + ("job_config", "expected"), + ( + (None, make_query_request()), + (QueryJobConfig(), make_query_request()), + ( + QueryJobConfig(default_dataset="my-project.my_dataset"), + make_query_request( + { + "defaultDataset": { + "projectId": "my-project", + "datasetId": "my_dataset", + } + } + ), + ), + (QueryJobConfig(dry_run=True), make_query_request({"dryRun": True})), + ( + QueryJobConfig(use_query_cache=False), + make_query_request({"useQueryCache": False}), + ), + ( + QueryJobConfig(use_legacy_sql=True), + make_query_request({"useLegacySql": True}), + ), + ( + QueryJobConfig( + query_parameters=[ + ScalarQueryParameter("named_param1", "STRING", "param-value"), + ScalarQueryParameter("named_param2", "INT64", 123), + ] + ), + make_query_request( + { + "parameterMode": "NAMED", + "queryParameters": [ + { + "name": "named_param1", + "parameterType": {"type": "STRING"}, + "parameterValue": {"value": "param-value"}, + }, + { + "name": "named_param2", + "parameterType": {"type": "INT64"}, + "parameterValue": {"value": "123"}, + }, + ], + } + ), + ), + ( + QueryJobConfig( + query_parameters=[ + ScalarQueryParameter(None, "STRING", "param-value"), + ScalarQueryParameter(None, "INT64", 123), + ] + ), + make_query_request( + { + "parameterMode": "POSITIONAL", + "queryParameters": [ + { + "parameterType": {"type": "STRING"}, + "parameterValue": {"value": "param-value"}, + }, + { + "parameterType": {"type": "INT64"}, + "parameterValue": {"value": "123"}, + }, + ], + } + ), + ), + ( + QueryJobConfig( + connection_properties=[ + ConnectionProperty(key="time_zone", value="America/Chicago"), + ConnectionProperty(key="session_id", value="abcd-efgh-ijkl-mnop"), + ] + ), + make_query_request( + { + "connectionProperties": [ + {"key": "time_zone", "value": "America/Chicago"}, + {"key": "session_id", "value": "abcd-efgh-ijkl-mnop"}, + ] + } + ), + ), + ( + QueryJobConfig(labels={"abc": "def"}), + make_query_request({"labels": {"abc": "def"}}), + ), + ( + QueryJobConfig(maximum_bytes_billed=987654), + make_query_request({"maximumBytesBilled": "987654"}), + ), + ), +) +def test__to_query_request(job_config, expected): + result = _job_helpers._to_query_request(job_config) + assert result == expected + + +def test__to_query_job_defaults(): + mock_client = mock.create_autospec(Client) + response = make_query_response( + job_id="test-job", project_id="some-project", location="asia-northeast1" + ) + job: QueryJob = _job_helpers._to_query_job(mock_client, "query-str", None, response) + assert job.query == "query-str" + assert job._client is mock_client + assert job.job_id == "test-job" + assert job.project == "some-project" + assert job.location == "asia-northeast1" + assert job.error_result is None + assert job.errors is None + + +def test__to_query_job_dry_run(): + mock_client = mock.create_autospec(Client) + response = make_query_response( + job_id="test-job", project_id="some-project", location="asia-northeast1" + ) + job_config: QueryJobConfig = QueryJobConfig() + job_config.dry_run = True + job: QueryJob = _job_helpers._to_query_job( + mock_client, "query-str", job_config, response + ) + assert job.dry_run is True + + +@pytest.mark.parametrize( + ("completed", "expected_state"), ((True, "DONE"), (False, "PENDING"),), +) +def test__to_query_job_sets_state(completed, expected_state): + mock_client = mock.create_autospec(Client) + response = make_query_response(completed=completed) + job: QueryJob = _job_helpers._to_query_job(mock_client, "query-str", None, response) + assert job.state == expected_state + + +def test__to_query_job_sets_errors(): + mock_client = mock.create_autospec(Client) + response = make_query_response( + errors=[ + # https://cloud.google.com/bigquery/docs/reference/rest/v2/ErrorProto + {"reason": "backendError", "message": "something went wrong"}, + {"message": "something else went wrong"}, + ] + ) + job: QueryJob = _job_helpers._to_query_job(mock_client, "query-str", None, response) + assert len(job.errors) == 2 + # If we got back a response instead of an HTTP error status code, most + # likely the job didn't completely fail. + assert job.error_result is None + + +def test_query_jobs_query_defaults(): + mock_client = mock.create_autospec(Client) + mock_retry = mock.create_autospec(retries.Retry) + mock_job_retry = mock.create_autospec(retries.Retry) + mock_client._call_api.return_value = { + "jobReference": { + "projectId": "test-project", + "jobId": "abc", + "location": "asia-northeast1", + } + } + _job_helpers.query_jobs_query( + mock_client, + "SELECT * FROM test", + None, + "asia-northeast1", + "test-project", + mock_retry, + None, + mock_job_retry, + ) + + assert mock_client._call_api.call_count == 1 + call_args, call_kwargs = mock_client._call_api.call_args + assert call_args[0] is mock_retry + # See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query + assert call_kwargs["path"] == "/projects/test-project/queries" + assert call_kwargs["method"] == "POST" + # See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest + request = call_kwargs["data"] + assert request["requestId"] is not None + assert request["query"] == "SELECT * FROM test" + assert request["location"] == "asia-northeast1" + assert request["formatOptions"]["useInt64Timestamp"] is True + assert "timeoutMs" not in request + + +def test_query_jobs_query_sets_format_options(): + """Since jobs.query can return results, ensure we use the lossless + timestamp format. + + See: https://github.com/googleapis/python-bigquery/issues/395 + """ + mock_client = mock.create_autospec(Client) + mock_retry = mock.create_autospec(retries.Retry) + mock_job_retry = mock.create_autospec(retries.Retry) + mock_client._call_api.return_value = { + "jobReference": {"projectId": "test-project", "jobId": "abc", "location": "US"} + } + _job_helpers.query_jobs_query( + mock_client, + "SELECT * FROM test", + None, + "US", + "test-project", + mock_retry, + None, + mock_job_retry, + ) + + assert mock_client._call_api.call_count == 1 + _, call_kwargs = mock_client._call_api.call_args + # See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest + request = call_kwargs["data"] + assert request["formatOptions"]["useInt64Timestamp"] is True + + +@pytest.mark.parametrize( + ("timeout", "expected_timeout"), + ((-1, 0), (0, 0), (1, 1000 - _job_helpers._TIMEOUT_BUFFER_MILLIS),), +) +def test_query_jobs_query_sets_timeout(timeout, expected_timeout): + mock_client = mock.create_autospec(Client) + mock_retry = mock.create_autospec(retries.Retry) + mock_job_retry = mock.create_autospec(retries.Retry) + mock_client._call_api.return_value = { + "jobReference": {"projectId": "test-project", "jobId": "abc", "location": "US"} + } + _job_helpers.query_jobs_query( + mock_client, + "SELECT * FROM test", + None, + "US", + "test-project", + mock_retry, + timeout, + mock_job_retry, + ) + + assert mock_client._call_api.call_count == 1 + _, call_kwargs = mock_client._call_api.call_args + # See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query#QueryRequest + request = call_kwargs["data"] + assert request["timeoutMs"] == expected_timeout + + +def test_make_job_id_wo_suffix(): + job_id = _job_helpers.make_job_id("job_id") + assert job_id == "job_id" + + +def test_make_job_id_w_suffix(): + with mock.patch("uuid.uuid4", side_effect=["212345"]): + job_id = _job_helpers.make_job_id(None, prefix="job_id") + + assert job_id == "job_id212345" + + +def test_make_job_id_random(): + with mock.patch("uuid.uuid4", side_effect=["212345"]): + job_id = _job_helpers.make_job_id(None) + + assert job_id == "212345" + + +def test_make_job_id_w_job_id_overrides_prefix(): + job_id = _job_helpers.make_job_id("job_id", prefix="unused_prefix") + assert job_id == "job_id" diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 0adb004fd..8ebf5137e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -4016,6 +4016,160 @@ def test_query_defaults(self): self.assertEqual(sent_config["query"], QUERY) self.assertFalse(sent_config["useLegacySql"]) + def test_query_w_api_method_query(self): + query = "select count(*) from persons" + response = { + "jobReference": { + "projectId": self.PROJECT, + "location": "EU", + "jobId": "abcd", + }, + } + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(response) + + job = client.query(query, location="EU", api_method="QUERY") + + self.assertEqual(job.query, query) + self.assertEqual(job.job_id, "abcd") + self.assertEqual(job.location, "EU") + + # Check that query actually starts the job. + expected_resource = { + "query": query, + "useLegacySql": False, + "location": "EU", + "formatOptions": {"useInt64Timestamp": True}, + "requestId": mock.ANY, + } + conn.api_request.assert_called_once_with( + method="POST", + path=f"/projects/{self.PROJECT}/queries", + data=expected_resource, + timeout=None, + ) + + def test_query_w_api_method_query_legacy_sql(self): + from google.cloud.bigquery import QueryJobConfig + + query = "select count(*) from persons" + response = { + "jobReference": { + "projectId": self.PROJECT, + "location": "EU", + "jobId": "abcd", + }, + } + job_config = QueryJobConfig() + job_config.use_legacy_sql = True + job_config.maximum_bytes_billed = 100 + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(response) + + job = client.query( + query, location="EU", job_config=job_config, api_method="QUERY" + ) + + self.assertEqual(job.query, query) + self.assertEqual(job.job_id, "abcd") + self.assertEqual(job.location, "EU") + + # Check that query actually starts the job. + expected_resource = { + "query": query, + "useLegacySql": True, + "location": "EU", + "formatOptions": {"useInt64Timestamp": True}, + "requestId": mock.ANY, + "maximumBytesBilled": "100", + } + conn.api_request.assert_called_once_with( + method="POST", + path=f"/projects/{self.PROJECT}/queries", + data=expected_resource, + timeout=None, + ) + + def test_query_w_api_method_query_parameters(self): + from google.cloud.bigquery import QueryJobConfig, ScalarQueryParameter + + query = "select count(*) from persons" + response = { + "jobReference": { + "projectId": self.PROJECT, + "location": "EU", + "jobId": "abcd", + }, + } + job_config = QueryJobConfig() + job_config.dry_run = True + job_config.query_parameters = [ScalarQueryParameter("param1", "INTEGER", 123)] + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + conn = client._connection = make_connection(response) + + job = client.query( + query, location="EU", job_config=job_config, api_method="QUERY" + ) + + self.assertEqual(job.query, query) + self.assertEqual(job.job_id, "abcd") + self.assertEqual(job.location, "EU") + + # Check that query actually starts the job. + expected_resource = { + "query": query, + "dryRun": True, + "useLegacySql": False, + "location": "EU", + "formatOptions": {"useInt64Timestamp": True}, + "requestId": mock.ANY, + "parameterMode": "NAMED", + "queryParameters": [ + { + "name": "param1", + "parameterType": {"type": "INTEGER"}, + "parameterValue": {"value": "123"}, + }, + ], + } + conn.api_request.assert_called_once_with( + method="POST", + path=f"/projects/{self.PROJECT}/queries", + data=expected_resource, + timeout=None, + ) + + def test_query_w_api_method_query_and_job_id_fails(self): + query = "select count(*) from persons" + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + client._connection = make_connection({}) + + with self.assertRaises(TypeError) as exc: + client.query(query, job_id="abcd", api_method="QUERY") + self.assertIn( + "`job_id` was provided, but the 'QUERY' `api_method` was requested", + exc.exception.args[0], + ) + + def test_query_w_api_method_unknown(self): + query = "select count(*) from persons" + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + client._connection = make_connection({}) + + with self.assertRaises(ValueError) as exc: + client.query(query, api_method="UNKNOWN") + self.assertIn("Got unexpected value for api_method: ", exc.exception.args[0]) + def test_query_w_explicit_timeout(self): query = "select count(*) from persons" resource = { @@ -6213,35 +6367,6 @@ def test_context_manager_exit_closes_client(self): fake_close.assert_called_once() -class Test_make_job_id(unittest.TestCase): - def _call_fut(self, job_id, prefix=None): - from google.cloud.bigquery.client import _make_job_id - - return _make_job_id(job_id, prefix=prefix) - - def test__make_job_id_wo_suffix(self): - job_id = self._call_fut("job_id") - - self.assertEqual(job_id, "job_id") - - def test__make_job_id_w_suffix(self): - with mock.patch("uuid.uuid4", side_effect=["212345"]): - job_id = self._call_fut(None, prefix="job_id") - - self.assertEqual(job_id, "job_id212345") - - def test__make_random_job_id(self): - with mock.patch("uuid.uuid4", side_effect=["212345"]): - job_id = self._call_fut(None) - - self.assertEqual(job_id, "212345") - - def test__make_job_id_w_job_id_overrides_prefix(self): - job_id = self._call_fut("job_id", prefix="unused_prefix") - - self.assertEqual(job_id, "job_id") - - class TestClientUpload(object): # NOTE: This is a "partner" to `TestClient` meant to test some of the # "load_table_from_file" portions of `Client`. It also uses