Skip to content

Commit

Permalink
Add default QueryJobConfig to Client (#6088)
Browse files Browse the repository at this point in the history
* master

* working implementation of default QueryJobConfigs attached to Client

* removing comments and help texts

* fixing lints

* bringing coverage up to 100%

* making revisions

* missed some changes

* making code tweaks

* Make _JobConfig._fill_from_default semi-private.

Also, update the docstrings to Google/Napoleon-style.
  • Loading branch information
blainehansen authored and tswast committed Sep 26, 2018
1 parent 8671f26 commit cf04f6d
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 3 deletions.
28 changes: 25 additions & 3 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ class Client(ClientWithProject):
current object.
This parameter should be considered private, and could change in
the future.
location str:
location (str):
(Optional) Default location for jobs / datasets / tables.
default_query_job_config (google.cloud.bigquery.job.QueryJobConfig):
(Optional) Default ``QueryJobConfig``.
Will be merged into job configs passed into the ``query`` method.
Raises:
google.auth.exceptions.DefaultCredentialsError:
Expand All @@ -122,11 +125,13 @@ class Client(ClientWithProject):
"""The scopes required for authenticating as a BigQuery consumer."""

def __init__(
self, project=None, credentials=None, _http=None, location=None):
self, project=None, credentials=None, _http=None,
location=None, default_query_job_config=None):
super(Client, self).__init__(
project=project, credentials=credentials, _http=_http)
self._connection = Connection(self)
self._location = location
self._default_query_job_config = default_query_job_config

@property
def location(self):
Expand Down Expand Up @@ -1187,7 +1192,9 @@ def extract_table(
return extract_job

def query(
self, query, job_config=None, job_id=None, job_id_prefix=None,
self, query,
job_config=None,
job_id=None, job_id_prefix=None,
location=None, project=None, retry=DEFAULT_RETRY):
"""Run a SQL query.
Expand All @@ -1202,6 +1209,10 @@ def query(
Keyword Arguments:
job_config (google.cloud.bigquery.job.QueryJobConfig):
(Optional) Extra configuration options for the job.
To override any options that were previously set in
the ``default_query_job_config`` given to the
``Client`` constructor, manually set those options to ``None``,
or whatever value is preferred.
job_id (str): (Optional) ID to use for the query job.
job_id_prefix (str):
(Optional) The prefix to use for a randomly generated job ID.
Expand All @@ -1226,6 +1237,17 @@ def query(
if location is None:
location = self.location

if self._default_query_job_config:
if job_config:
# anything that's not defined on the incoming
# that is in the default,
# should be filled in with the default
# the incoming therefore has precedence
job_config = job_config._fill_from_default(
self._default_query_job_config)
else:
job_config = self._default_query_job_config

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(
job_ref, query, client=self, job_config=job_config)
Expand Down
33 changes: 33 additions & 0 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,39 @@ def to_api_repr(self):
"""
return copy.deepcopy(self._properties)

def _fill_from_default(self, default_job_config):
"""Merge this job config with a default job config.
The keys in this object take precedence over the keys in the default
config. The merge is done at the top-level as well as for keys one
level below the job type.
Arguments:
default_job_config (google.cloud.bigquery.job._JobConfig):
The default job config that will be used to fill in self.
Returns:
google.cloud.bigquery.job._JobConfig A new (merged) job config.
"""
if self._job_type != default_job_config._job_type:
raise TypeError(
"attempted to merge two incompatible job types: "
+ repr(self._job_type) + ', '
+ repr(default_job_config._job_type))

new_job_config = self.__class__()

default_job_properties = copy.deepcopy(default_job_config._properties)
for key in self._properties:
if key != self._job_type:
default_job_properties[key] = self._properties[key]

default_job_properties[self._job_type] \
.update(self._properties[self._job_type])
new_job_config._properties = default_job_properties

return new_job_config

@classmethod
def from_api_repr(cls, resource):
"""Factory: construct a job configuration given its API representation
Expand Down
164 changes: 164 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ def test_ctor_w_location(self):
self.assertIs(client._connection.http, http)
self.assertEqual(client.location, location)

def test_ctor_w_query_job_config(self):
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import QueryJobConfig

creds = _make_credentials()
http = object()
location = 'us-central'
job_config = QueryJobConfig()
job_config.dry_run = True

client = self._make_one(project=self.PROJECT, credentials=creds,
_http=http, location=location,
default_query_job_config=job_config)
self.assertIsInstance(client._connection, Connection)
self.assertIs(client._connection.credentials, creds)
self.assertIs(client._connection.http, http)
self.assertEqual(client.location, location)

self.assertIsInstance(client._default_query_job_config, QueryJobConfig)
self.assertTrue(client._default_query_job_config.dry_run)

def test__get_query_results_miss_w_explicit_project_and_timeout(self):
from google.cloud.exceptions import NotFound

Expand Down Expand Up @@ -2707,6 +2728,149 @@ def test_query_w_explicit_project(self):
data=resource,
)

def test_query_w_explicit_job_config(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'defaultDataset': {
'projectId': self.PROJECT,
'datasetId': 'some-dataset',
},
'useLegacySql': False,
'useQueryCache': True,
'maximumBytesBilled': '2000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference
default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, 'some-dataset')
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds,
_http=http, default_query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000

client.query(
query, job_id=job_id, location=self.LOCATION,
job_config=job_config)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_explicit_job_config_override(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'defaultDataset': None,
'useLegacySql': False,
'useQueryCache': True,
'maximumBytesBilled': '2000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference
default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, 'some-dataset')
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds, _http=http,
default_query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000
job_config.default_dataset = None

client.query(
query, job_id=job_id, location=self.LOCATION,
job_config=job_config,
)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_client_default_config_no_incoming(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
resource = {
'jobReference': {
'jobId': job_id,
'projectId': self.PROJECT,
'location': self.LOCATION,
},
'configuration': {
'query': {
'query': query,
'useLegacySql': False,
'maximumBytesBilled': '1000',
},
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig
default_job_config = QueryJobConfig()
default_job_config.maximum_bytes_billed = 1000

client = self._make_one(
project=self.PROJECT, credentials=creds, _http=http,
default_query_job_config=default_job_config)
conn = client._connection = _make_connection(resource)

client.query(
query, job_id=job_id, location=self.LOCATION)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method='POST',
path='/projects/PROJECT/jobs',
data=resource,
)

def test_query_w_client_location(self):
job_id = 'some-job-id'
query = 'select count(*) from persons'
Expand Down
28 changes: 28 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,34 @@ def test_ctor(self):
self.assertEqual(job_config._job_type, self.JOB_TYPE)
self.assertEqual(job_config._properties, {self.JOB_TYPE: {}})

def test_fill_from_default(self):
from google.cloud.bigquery import QueryJobConfig

job_config = QueryJobConfig()
job_config.dry_run = True
job_config.maximum_bytes_billed = 1000

default_job_config = QueryJobConfig()
default_job_config.use_query_cache = True
default_job_config.maximum_bytes_billed = 2000

final_job_config = job_config._fill_from_default(default_job_config)
self.assertTrue(final_job_config.dry_run)
self.assertTrue(final_job_config.use_query_cache)
self.assertEqual(final_job_config.maximum_bytes_billed, 1000)

def test_fill_from_default_conflict(self):
from google.cloud.bigquery import QueryJobConfig

basic_job_config = QueryJobConfig()
conflicting_job_config = self._make_one('conflicting_job_type')
self.assertNotEqual(
basic_job_config._job_type, conflicting_job_config._job_type)

with self.assertRaises(TypeError):
basic_job_config._fill_from_default(
conflicting_job_config)

@mock.patch('google.cloud.bigquery._helpers._get_sub_prop')
def test__get_sub_prop_wo_default(self, _get_sub_prop):
job_config = self._make_one()
Expand Down

0 comments on commit cf04f6d

Please sign in to comment.