Skip to content

Commit

Permalink
BigQuery: Add load_table_from_json() method to BQ client (googleapis#…
Browse files Browse the repository at this point in the history
…9076)

* Add load_table_from_json() method to BQ client

* Manipulate a copy of the job config if provided

The load_table_from_json() should not directly change the job config
passed in as an argument.

* Enable schema autodetect if no explicit schema

* Cover the path of schema provided in unit tests

* Improve tests readability and harden assertions
  • Loading branch information
plamut authored and emar-kar committed Sep 18, 2019
1 parent e3dcf4a commit 7b9600a
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 0 deletions.
82 changes: 82 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,88 @@ def load_table_from_dataframe(
finally:
os.remove(tmppath)

def load_table_from_json(
self,
json_rows,
destination,
num_retries=_DEFAULT_NUM_RETRIES,
job_id=None,
job_id_prefix=None,
location=None,
project=None,
job_config=None,
):
"""Upload the contents of a table from a JSON string or dict.
Arguments:
json_rows (Iterable[Dict[str, Any]]):
Row data to be inserted. Keys must match the table schema fields
and values must be JSON-compatible representations.
destination (Union[ \
:class:`~google.cloud.bigquery.table.Table`, \
:class:`~google.cloud.bigquery.table.TableReference`, \
str, \
]):
Table into which data is to be loaded. If a string is passed
in, this method attempts to create a table reference from a
string using
:func:`google.cloud.bigquery.table.TableReference.from_string`.
Keyword Arguments:
num_retries (int, optional): Number of upload retries.
job_id (str): (Optional) Name of the job.
job_id_prefix (str):
(Optional) the user-provided prefix for a randomly generated
job ID. This parameter will be ignored if a ``job_id`` is
also given.
location (str):
Location where to run the job. Must match the location of the
destination table.
project (str):
Project ID of the project of where to run the job. Defaults
to the client's project.
job_config (google.cloud.bigquery.job.LoadJobConfig):
(Optional) Extra configuration options for the job. The
``source_format`` setting is always set to
:attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
"""
job_id = _make_job_id(job_id, job_id_prefix)

if job_config is None:
job_config = job.LoadJobConfig()
else:
# Make a copy so that the job config isn't modified in-place.
job_config = copy.deepcopy(job_config)
job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON

if job_config.schema is None:
job_config.autodetect = True

if project is None:
project = self.project

if location is None:
location = self.location

destination = _table_arg_to_table_ref(destination, default_project=self.project)

data_str = u"\n".join(json.dumps(item) for item in json_rows)
data_file = io.BytesIO(data_str.encode())

return self.load_table_from_file(
data_file,
destination,
num_retries=num_retries,
job_id=job_id,
job_id_prefix=job_id_prefix,
location=location,
project=project,
job_config=job_config,
)

def _do_resumable_upload(self, stream, metadata, num_retries):
"""Perform a resumable upload.
Expand Down
70 changes: 70 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,76 @@ def test_load_table_from_dataframe_w_explicit_schema(self):
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 3)

def test_load_table_from_json_basic_use(self):
table_schema = (
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("birthday", "DATE", mode="REQUIRED"),
bigquery.SchemaField("is_awesome", "BOOLEAN", mode="REQUIRED"),
)

json_rows = [
{"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False},
{"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True},
]

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_json_basic_use".format(
Config.CLIENT.project, dataset_id
)

# Create the table before loading so that schema mismatch errors are
# identified.
table = retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

job_config = bigquery.LoadJobConfig(schema=table_schema)
load_job = Config.CLIENT.load_table_from_json(
json_rows, table_id, job_config=job_config
)
load_job.result()

table = Config.CLIENT.get_table(table)
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 2)

def test_load_table_from_json_schema_autodetect(self):
json_rows = [
{"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False},
{"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True},
]

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_json_basic_use".format(
Config.CLIENT.project, dataset_id
)

# Use schema with NULLABLE fields, because schema autodetection
# defaults to field mode NULLABLE.
table_schema = (
bigquery.SchemaField("name", "STRING", mode="NULLABLE"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("birthday", "DATE", mode="NULLABLE"),
bigquery.SchemaField("is_awesome", "BOOLEAN", mode="NULLABLE"),
)
# create the table before loading so that the column order is predictable
table = retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# do not pass an explicit job config to trigger automatic schema detection
load_job = Config.CLIENT.load_table_from_json(json_rows, table_id)
load_job.result()

table = Config.CLIENT.get_table(table)
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 2)

def test_load_avro_from_uri_then_dump_table(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SourceFormat
Expand Down
88 changes: 88 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5768,6 +5768,94 @@ def test_load_table_from_dataframe_w_nulls(self):
assert sent_config.schema == schema
assert sent_config.source_format == job.SourceFormat.PARQUET

def test_load_table_from_json_basic_use(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job

client = self._make_client()

json_rows = [
{"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False},
{"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True},
]

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

with load_patch as load_table_from_file:
client.load_table_from_json(json_rows, self.TABLE_REF)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
job_id=mock.ANY,
job_id_prefix=None,
location=client.location,
project=client.project,
job_config=mock.ANY,
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON
assert sent_config.schema is None
assert sent_config.autodetect

def test_load_table_from_json_non_default_args(self):
from google.cloud.bigquery import job
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()

json_rows = [
{"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False},
{"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True},
]

schema = [
SchemaField("name", "STRING"),
SchemaField("age", "INTEGER"),
SchemaField("adult", "BOOLEAN"),
]
job_config = job.LoadJobConfig(schema=schema)
job_config._properties["load"]["unknown_field"] = "foobar"

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

with load_patch as load_table_from_file:
client.load_table_from_json(
json_rows,
self.TABLE_REF,
job_config=job_config,
project="project-x",
location="EU",
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
job_id=mock.ANY,
job_id_prefix=None,
location="EU",
project="project-x",
job_config=mock.ANY,
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert job_config.source_format is None # the original was not modified
assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON
assert sent_config.schema == schema
assert not sent_config.autodetect
# all properties should have been cloned and sent to the backend
assert sent_config._properties.get("load", {}).get("unknown_field") == "foobar"

# Low-level tests

@classmethod
Expand Down

0 comments on commit 7b9600a

Please sign in to comment.