Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add QueryJob.schema property for dry run queries #1014

Merged
merged 4 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/cloud/bigquery/job/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,9 @@ def from_api_repr(cls, resource: dict, client) -> "UnknownJob":
Returns:
UnknownJob: Job corresponding to the resource.
"""
job_ref_properties = resource.get("jobReference", {"projectId": client.project})
job_ref_properties = resource.get(
"jobReference", {"projectId": client.project, "jobId": None}
)
job_ref = _JobReference._from_api_repr(job_ref_properties)
job = cls(job_ref, client)
# Populate the job reference with the project, even if it has been
Expand Down
24 changes: 20 additions & 4 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import copy
import re
import typing
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Optional, Union

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
Expand All @@ -38,6 +38,7 @@
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand All @@ -57,6 +58,7 @@
import pyarrow
from google.api_core import retry as retries
from google.cloud import bigquery_storage
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.table import RowIterator


Expand Down Expand Up @@ -853,7 +855,7 @@ def to_api_repr(self):
}

@classmethod
def from_api_repr(cls, resource: dict, client) -> "QueryJob":
def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
"""Factory: construct a job given its API representation

Args:
Expand All @@ -866,8 +868,10 @@ def from_api_repr(cls, resource: dict, client) -> "QueryJob":
Returns:
google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
"""
cls._check_resource_config(resource)
job_ref = _JobReference._from_api_repr(resource["jobReference"])
job_ref_properties = resource.setdefault(
"jobReference", {"projectId": client.project, "jobId": None}
)
job_ref = _JobReference._from_api_repr(job_ref_properties)
job = cls(job_ref, None, client=client)
job._set_properties(resource)
return job
Expand All @@ -887,6 +891,18 @@ def query_plan(self):
plan_entries = self._job_statistics().get("queryPlan", ())
return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]

@property
def schema(self) -> Optional[List[SchemaField]]:
"""The schema of the results.

Present only for successful dry run of non-legacy SQL queries.
"""
resource = self._job_statistics().get("schema")
if resource is None:
return None
fields = resource.get("fields", [])
return [SchemaField.from_api_repr(field) for field in fields]

@property
def timeline(self):
"""List(TimelineEntry): Return the query execution timeline
Expand Down
29 changes: 29 additions & 0 deletions tests/system/test_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.cloud import bigquery


def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str):
query_config = bigquery.QueryJobConfig()
query_config.dry_run = True

query_string = f"SELECT * FROM {scalars_table}"
query_job = bigquery_client.query(query_string, job_config=query_config,)

# Note: `query_job.result()` is not necessary on a dry run query. All
# necessary information is returned in the initial response.
assert query_job.dry_run is True
assert query_job.total_bytes_processed > 0
assert len(query_job.schema) > 0
56 changes: 37 additions & 19 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,25 +269,6 @@ def test_ctor_w_query_parameters(self):
job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=config)
self.assertEqual(job.query_parameters, query_parameters)

def test_from_api_repr_missing_identity(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {}
klass = self._get_target_class()
with self.assertRaises(KeyError):
klass.from_api_repr(RESOURCE, client=client)

def test_from_api_repr_missing_config(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": "%s:%s" % (self.PROJECT, self.DS_ID),
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
}
klass = self._get_target_class()
with self.assertRaises(KeyError):
klass.from_api_repr(RESOURCE, client=client)

def test_from_api_repr_bare(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
Expand Down Expand Up @@ -1405,6 +1386,43 @@ def test_result_transport_timeout_error(self):
with call_api_patch, self.assertRaises(concurrent.futures.TimeoutError):
job.result(timeout=1)

def test_no_schema(self):
client = _make_client(project=self.PROJECT)
resource = {}
klass = self._get_target_class()
job = klass.from_api_repr(resource, client=client)
assert job.schema is None

def test_schema(self):
client = _make_client(project=self.PROJECT)
resource = {
"statistics": {
"query": {
"schema": {
"fields": [
{"mode": "NULLABLE", "name": "bool_col", "type": "BOOLEAN"},
{
"mode": "NULLABLE",
"name": "string_col",
"type": "STRING",
},
{
"mode": "NULLABLE",
"name": "timestamp_col",
"type": "TIMESTAMP",
},
]
},
},
},
}
klass = self._get_target_class()
job = klass.from_api_repr(resource, client=client)
assert len(job.schema) == 3
assert job.schema[0].field_type == "BOOLEAN"
assert job.schema[1].field_type == "STRING"
assert job.schema[2].field_type == "TIMESTAMP"

def test__begin_error(self):
from google.cloud import exceptions

Expand Down