Skip to content

Commit

Permalink
BigQuery: replaces table.update() and table.patch() with client.updat…
Browse files Browse the repository at this point in the history
…e_table() (#4076)

* adds client.update_table()

* removes table.update() and table.patch()

* adds coverage for _verifyResourceProperties()

* adds test for deleting property and refactors table resource creation

* fixes update_table tests

* Fixes logic in _build_resource()
  • Loading branch information
alixhami authored and tswast committed Oct 16, 2017
1 parent 3a99312 commit a3d7465
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 344 deletions.
28 changes: 27 additions & 1 deletion bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,12 @@ def create_table(self, table):
"""
path = '/projects/%s/datasets/%s/tables' % (
table.project, table.dataset_id)
resource = table._build_resource(Table.all_fields)
doomed = [field for field in resource if resource[field] is None]
for field in doomed:
del resource[field]
api_response = self._connection.api_request(
method='POST', path=path, data=table._build_resource())
method='POST', path=path, data=resource)
return Table.from_api_repr(api_response, self)

def get_dataset(self, dataset_ref):
Expand Down Expand Up @@ -285,6 +289,28 @@ def update_dataset(self, dataset, fields):
method='PATCH', path=path, data=partial, headers=headers)
return Dataset.from_api_repr(api_response)

def update_table(self, table, properties):
"""API call: update table properties via a PUT request
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/update
:type table:
:class:`google.cloud.bigquery.table.Table`
:param table_ref: the table to update.
:rtype: :class:`google.cloud.bigquery.table.Table`
:returns: a ``Table`` instance
"""
partial = table._build_resource(properties)
if table.etag is not None:
headers = {'If-Match': table.etag}
else:
headers = None
api_response = self._connection.api_request(
method='PATCH', path=table.path, data=partial, headers=headers)
return Table.from_api_repr(api_response, client=self)

def list_dataset_tables(self, dataset, max_results=None, page_token=None):
"""List tables in the dataset.
Expand Down
177 changes: 56 additions & 121 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ class Table(object):

_schema = None

all_fields = [
'description', 'friendly_name', 'expires', 'location',
'partitioning_type', 'view_use_legacy_sql', 'view_query', 'schema'
]

def __init__(self, table_ref, schema=(), client=None):
self._project = table_ref.project
self._table_id = table_ref.table_id
Expand Down Expand Up @@ -240,9 +245,12 @@ def schema(self, value):
:raises: TypeError if 'value' is not a sequence, or ValueError if
any item in the sequence is not a SchemaField
"""
if not all(isinstance(field, SchemaField) for field in value):
if value is None:
self._schema = ()
elif not all(isinstance(field, SchemaField) for field in value):
raise ValueError('Schema items must be fields')
self._schema = tuple(value)
else:
self._schema = tuple(value)

@property
def created(self):
Expand Down Expand Up @@ -613,41 +621,59 @@ def _set_properties(self, api_response):
cleaned['expirationTime'] = float(cleaned['expirationTime'])
self._properties.update(cleaned)

def _build_resource(self):
"""Generate a resource for ``create`` or ``update``."""
resource = {
'tableReference': {
'projectId': self._project,
'datasetId': self._dataset_id,
'tableId': self.table_id},
}
if self.description is not None:
resource['description'] = self.description

if self.expires is not None:
value = _millis_from_datetime(self.expires)
resource['expirationTime'] = value

if self.friendly_name is not None:
resource['friendlyName'] = self.friendly_name
def _populate_expires_resource(self, resource):
resource['expirationTime'] = _millis_from_datetime(self.expires)

if self.location is not None:
resource['location'] = self.location
def _populate_partitioning_type_resource(self, resource):
resource['timePartitioning'] = self._properties.get('timePartitioning')

if self.partitioning_type is not None:
resource['timePartitioning'] = self._properties['timePartitioning']
def _populate_view_use_legacy_sql_resource(self, resource):
if 'view' not in resource:
resource['view'] = {}
resource['view']['useLegacySql'] = self.view_use_legacy_sql

if self.view_query is not None:
view = resource['view'] = {}
view['query'] = self.view_query
if self.view_use_legacy_sql is not None:
view['useLegacySql'] = self.view_use_legacy_sql
def _populate_view_query_resource(self, resource):
if self.view_query is None:
resource['view'] = None
return
if 'view' not in resource:
resource['view'] = {}
resource['view']['query'] = self.view_query

if self._schema:
def _populate_schema_resource(self, resource):
if not self._schema:
resource['schema'] = None
else:
resource['schema'] = {
'fields': _build_schema_resource(self._schema)
'fields': _build_schema_resource(self._schema),
}

custom_resource_fields = {
'expires': _populate_expires_resource,
'partitioning_type': _populate_partitioning_type_resource,
'view_query': _populate_view_query_resource,
'view_use_legacy_sql': _populate_view_use_legacy_sql_resource,
'schema': _populate_schema_resource
}

def _build_resource(self, filter_fields):
"""Generate a resource for ``create`` or ``update``."""
resource = {
'tableReference': {
'projectId': self._project,
'datasetId': self._dataset_id,
'tableId': self.table_id},
}
for f in filter_fields:
if f in self.custom_resource_fields:
self.custom_resource_fields[f](self, resource)
else:
# TODO(alixh) refactor to use in both Table and Dataset
# snake case to camel case
words = f.split('_')
api_field = words[0] + ''.join(
map(str.capitalize, words[1:]))
resource[api_field] = getattr(self, f)
return resource

def exists(self, client=None):
Expand All @@ -674,97 +700,6 @@ def exists(self, client=None):
else:
return True

def patch(self,
client=None,
friendly_name=_MARKER,
description=_MARKER,
location=_MARKER,
expires=_MARKER,
view_query=_MARKER,
schema=_MARKER):
"""API call: update individual table properties via a PATCH request
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/patch
:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
:type friendly_name: str
:param friendly_name: (Optional) a descriptive name for this table.
:type description: str
:param description: (Optional) a description of this table.
:type location: str
:param location:
(Optional) the geographic location where the table resides.
:type expires: :class:`datetime.datetime`
:param expires: (Optional) point in time at which the table expires.
:type view_query: str
:param view_query: SQL query defining the table as a view
:type schema: list of :class:`SchemaField`
:param schema: fields describing the schema
:raises: ValueError for invalid value types.
"""
client = self._require_client(client)

partial = {}

if expires is not _MARKER:
if (not isinstance(expires, datetime.datetime) and
expires is not None):
raise ValueError("Pass a datetime, or None")
partial['expirationTime'] = _millis_from_datetime(expires)

if description is not _MARKER:
partial['description'] = description

if friendly_name is not _MARKER:
partial['friendlyName'] = friendly_name

if location is not _MARKER:
partial['location'] = location

if view_query is not _MARKER:
if view_query is None:
partial['view'] = None
else:
partial['view'] = {'query': view_query}

if schema is not _MARKER:
if schema is None:
partial['schema'] = None
else:
partial['schema'] = {
'fields': _build_schema_resource(schema)}

api_response = client._connection.api_request(
method='PATCH', path=self.path, data=partial)
self._set_properties(api_response)

def update(self, client=None):
"""API call: update table properties via a PUT request
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/update
:type client: :class:`~google.cloud.bigquery.client.Client` or
``NoneType``
:param client: the client to use. If not passed, falls back to the
``client`` stored on the current dataset.
"""
client = self._require_client(client)
api_response = client._connection.api_request(
method='PUT', path=self.path, data=self._build_resource())
self._set_properties(api_response)

def row_from_mapping(self, mapping):
"""Convert a mapping to a row tuple using the schema.
Expand Down
53 changes: 36 additions & 17 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,33 +250,50 @@ def test_list_dataset_tables(self):
table.dataset_id == DATASET_ID)]
self.assertEqual(len(created), len(tables_to_create))

def test_patch_table(self):
dataset = self.temp_dataset(_make_dataset_id('patch_table'))
def test_update_table(self):
dataset = self.temp_dataset(_make_dataset_id('update_table'))

TABLE_NAME = 'test_table'
full_name = bigquery.SchemaField('full_name', 'STRING',
mode='REQUIRED')
age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED')
table_arg = Table(dataset.table(TABLE_NAME), schema=[full_name, age],
schema = [
bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED')
]
table_arg = Table(dataset.table(TABLE_NAME), schema=schema,
client=Config.CLIENT)
self.assertFalse(table_arg.exists())
table = retry_403(Config.CLIENT.create_table)(table_arg)
self.to_delete.insert(0, table)
self.assertTrue(table.exists())
self.assertIsNone(table.friendly_name)
self.assertIsNone(table.description)
table.patch(friendly_name='Friendly', description='Description')
self.assertEqual(table.friendly_name, 'Friendly')
self.assertEqual(table.description, 'Description')
table.friendly_name = 'Friendly'
table.description = 'Description'

def test_update_table(self):
table2 = Config.CLIENT.update_table(
table, ['friendly_name', 'description'])

self.assertEqual(table2.friendly_name, 'Friendly')
self.assertEqual(table2.description, 'Description')

table2.description = None
table3 = Config.CLIENT.update_table(table2, ['description'])
self.assertIsNone(table3.description)

# If we try to update using table2 again, it will fail because the
# previous update changed the ETag.
table2.description = 'no good'
with self.assertRaises(PreconditionFailed):
Config.CLIENT.update_table(table2, ['description'])

def test_update_table_schema(self):
dataset = self.temp_dataset(_make_dataset_id('update_table'))

TABLE_NAME = 'test_table'
full_name = bigquery.SchemaField('full_name', 'STRING',
mode='REQUIRED')
age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED')
table_arg = Table(dataset.table(TABLE_NAME), schema=[full_name, age],
schema = [
bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED')
]
table_arg = Table(dataset.table(TABLE_NAME), schema=schema,
client=Config.CLIENT)
self.assertFalse(table_arg.exists())
table = retry_403(Config.CLIENT.create_table)(table_arg)
Expand All @@ -286,9 +303,11 @@ def test_update_table(self):
schema = table.schema
schema.append(voter)
table.schema = schema
table.update()
self.assertEqual(len(table.schema), len(schema))
for found, expected in zip(table.schema, schema):

updated_table = Config.CLIENT.update_table(table, ['schema'])

self.assertEqual(len(updated_table.schema), len(schema))
for found, expected in zip(updated_table.schema, schema):
self.assertEqual(found.name, expected.name)
self.assertEqual(found.field_type, expected.field_type)
self.assertEqual(found.mode, expected.mode)
Expand Down
Loading

0 comments on commit a3d7465

Please sign in to comment.