From ee2a67f5ec91356a833ae7a67af670da8224cb13 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 22 Aug 2019 22:01:49 +0200 Subject: [PATCH] BigQuery: Add load_table_from_json() method to BQ client (#9076) * Add load_table_from_json() method to BQ client * Manipulate a copy of the job config if provided The load_table_from_json() should not directly change the job config passed in as an argument. * Enable schema autodetect if no explicit schema * Cover the path of schema provided in unit tests * Improve tests readability and harden assertions --- bigquery/google/cloud/bigquery/client.py | 82 ++++++++++++++++++++++ bigquery/tests/system.py | 70 +++++++++++++++++++ bigquery/tests/unit/test_client.py | 88 ++++++++++++++++++++++++ 3 files changed, 240 insertions(+) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index da169cb55bf2..1985a45057ba 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1577,6 +1577,88 @@ def load_table_from_dataframe( finally: os.remove(tmppath) + def load_table_from_json( + self, + json_rows, + destination, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=None, + job_id_prefix=None, + location=None, + project=None, + job_config=None, + ): + """Upload the contents of a table from a JSON string or dict. + + Arguments: + json_rows (Iterable[Dict[str, Any]]): + Row data to be inserted. Keys must match the table schema fields + and values must be JSON-compatible representations. + destination (Union[ \ + :class:`~google.cloud.bigquery.table.Table`, \ + :class:`~google.cloud.bigquery.table.TableReference`, \ + str, \ + ]): + Table into which data is to be loaded. If a string is passed + in, this method attempts to create a table reference from a + string using + :func:`google.cloud.bigquery.table.TableReference.from_string`. + + Keyword Arguments: + num_retries (int, optional): Number of upload retries. + job_id (str): (Optional) Name of the job. + job_id_prefix (str): + (Optional) the user-provided prefix for a randomly generated + job ID. This parameter will be ignored if a ``job_id`` is + also given. + location (str): + Location where to run the job. Must match the location of the + destination table. + project (str): + Project ID of the project of where to run the job. Defaults + to the client's project. + job_config (google.cloud.bigquery.job.LoadJobConfig): + (Optional) Extra configuration options for the job. The + ``source_format`` setting is always set to + :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`. + + Returns: + google.cloud.bigquery.job.LoadJob: A new load job. + """ + job_id = _make_job_id(job_id, job_id_prefix) + + if job_config is None: + job_config = job.LoadJobConfig() + else: + # Make a copy so that the job config isn't modified in-place. + job_config = copy.deepcopy(job_config) + job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON + + if job_config.schema is None: + job_config.autodetect = True + + if project is None: + project = self.project + + if location is None: + location = self.location + + destination = _table_arg_to_table_ref(destination, default_project=self.project) + + data_str = u"\n".join(json.dumps(item) for item in json_rows) + data_file = io.BytesIO(data_str.encode()) + + return self.load_table_from_file( + data_file, + destination, + num_retries=num_retries, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + job_config=job_config, + ) + def _do_resumable_upload(self, stream, metadata, num_retries): """Perform a resumable upload. diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 1422c3c7cb60..3593e1ecb609 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -917,6 +917,76 @@ def test_load_table_from_dataframe_w_explicit_schema(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 3) + def test_load_table_from_json_basic_use(self): + table_schema = ( + bigquery.SchemaField("name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("birthday", "DATE", mode="REQUIRED"), + bigquery.SchemaField("is_awesome", "BOOLEAN", mode="REQUIRED"), + ) + + json_rows = [ + {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, + {"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True}, + ] + + dataset_id = _make_dataset_id("bq_system_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_json_basic_use".format( + Config.CLIENT.project, dataset_id + ) + + # Create the table before loading so that schema mismatch errors are + # identified. + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = Config.CLIENT.load_table_from_json( + json_rows, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 2) + + def test_load_table_from_json_schema_autodetect(self): + json_rows = [ + {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, + {"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True}, + ] + + dataset_id = _make_dataset_id("bq_system_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_json_basic_use".format( + Config.CLIENT.project, dataset_id + ) + + # Use schema with NULLABLE fields, because schema autodetection + # defaults to field mode NULLABLE. + table_schema = ( + bigquery.SchemaField("name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("birthday", "DATE", mode="NULLABLE"), + bigquery.SchemaField("is_awesome", "BOOLEAN", mode="NULLABLE"), + ) + # create the table before loading so that the column order is predictable + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + # do not pass an explicit job config to trigger automatic schema detection + load_job = Config.CLIENT.load_table_from_json(json_rows, table_id) + load_job.result() + + table = Config.CLIENT.get_table(table) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 2) + def test_load_avro_from_uri_then_dump_table(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import SourceFormat diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 1fd6d87487ae..ce03ffbf7f35 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5768,6 +5768,94 @@ def test_load_table_from_dataframe_w_nulls(self): assert sent_config.schema == schema assert sent_config.source_format == job.SourceFormat.PARQUET + def test_load_table_from_json_basic_use(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + + client = self._make_client() + + json_rows = [ + {"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False}, + {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, + ] + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + with load_patch as load_table_from_file: + client.load_table_from_json(json_rows, self.TABLE_REF) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=mock.ANY, + job_id_prefix=None, + location=client.location, + project=client.project, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON + assert sent_config.schema is None + assert sent_config.autodetect + + def test_load_table_from_json_non_default_args(self): + from google.cloud.bigquery import job + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + + json_rows = [ + {"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False}, + {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, + ] + + schema = [ + SchemaField("name", "STRING"), + SchemaField("age", "INTEGER"), + SchemaField("adult", "BOOLEAN"), + ] + job_config = job.LoadJobConfig(schema=schema) + job_config._properties["load"]["unknown_field"] = "foobar" + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + with load_patch as load_table_from_file: + client.load_table_from_json( + json_rows, + self.TABLE_REF, + job_config=job_config, + project="project-x", + location="EU", + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=mock.ANY, + job_id_prefix=None, + location="EU", + project="project-x", + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert job_config.source_format is None # the original was not modified + assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON + assert sent_config.schema == schema + assert not sent_config.autodetect + # all properties should have been cloned and sent to the backend + assert sent_config._properties.get("load", {}).get("unknown_field") == "foobar" + # Low-level tests @classmethod