From 9b90a312734dc925e015cd921842f0dff8ba47e3 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 19 Aug 2019 16:54:55 +0200 Subject: [PATCH 1/5] Add load_table_from_json() method to BQ client --- bigquery/google/cloud/bigquery/client.py | 76 ++++++++++++++++++++++++ bigquery/tests/system.py | 37 ++++++++++++ bigquery/tests/unit/test_client.py | 74 +++++++++++++++++++++++ 3 files changed, 187 insertions(+) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 1b13ee126a5d..5ef519cfdadd 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1592,6 +1592,82 @@ def load_table_from_dataframe( finally: os.remove(tmppath) + def load_table_from_json( + self, + json_rows, + destination, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=None, + job_id_prefix=None, + location=None, + project=None, + job_config=None, + ): + """Upload the contents of a table from a JSON string or dict. + + Arguments: + json_rows (Iterable[Dict[str, Any]]): + Row data to be inserted. Keys must match the table schema fields + and values must be JSON-compatible representations. + destination (Union[ \ + :class:`~google.cloud.bigquery.table.Table`, \ + :class:`~google.cloud.bigquery.table.TableReference`, \ + str, \ + ]): + Table into which data is to be loaded. If a string is passed + in, this method attempts to create a table reference from a + string using + :func:`google.cloud.bigquery.table.TableReference.from_string`. + + Keyword Arguments: + num_retries (int, optional): Number of upload retries. + job_id (str): (Optional) Name of the job. + job_id_prefix (str): + (Optional) the user-provided prefix for a randomly generated + job ID. This parameter will be ignored if a ``job_id`` is + also given. + location (str): + Location where to run the job. Must match the location of the + destination table. + project (str): + Project ID of the project of where to run the job. Defaults + to the client's project. + job_config (google.cloud.bigquery.job.LoadJobConfig): + (Optional) Extra configuration options for the job. The + ``source_format`` setting is always set to + :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`. + + Returns: + google.cloud.bigquery.job.LoadJob: A new load job. + """ + job_id = _make_job_id(job_id, job_id_prefix) + + if job_config is None: + job_config = job.LoadJobConfig() + job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON + + if project is None: + project = self.project + + if location is None: + location = self.location + + destination = _table_arg_to_table_ref(destination, default_project=self.project) + + data_str = u"\n".join(json.dumps(item) for item in json_rows) + data_file = io.BytesIO(data_str.encode()) + + return self.load_table_from_file( + data_file, + destination, + num_retries=num_retries, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + job_config=job_config, + ) + def _do_resumable_upload(self, stream, metadata, num_retries): """Perform a resumable upload. diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 59a72297ed87..ab660280a1d8 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -902,6 +902,43 @@ def test_load_table_from_dataframe_w_explicit_schema(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 3) + def test_load_table_from_json_basic_use(self): + table_schema = ( + bigquery.SchemaField("name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("birthday", "DATE", mode="REQUIRED"), + bigquery.SchemaField("is_awesome", "BOOLEAN", mode="REQUIRED"), + ) + + json_rows = [ + {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, + {"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True}, + ] + + job_config = bigquery.LoadJobConfig(schema=table_schema) + dataset_id = _make_dataset_id("bq_system_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_json_basic_use".format( + Config.CLIENT.project, dataset_id + ) + + # Create the table before loading so that schema mismatch errors are + # identified. + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + job_config = bigquery.LoadJobConfig(schema=table_schema) + load_job = Config.CLIENT.load_table_from_json( + json_rows, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 2) + def test_load_avro_from_uri_then_dump_table(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import SourceFormat diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 8a2a1228cd65..2fe2b9cb6fdd 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5582,6 +5582,80 @@ def test_load_table_from_dataframe_w_nulls(self): assert sent_config.schema == schema assert sent_config.source_format == job.SourceFormat.PARQUET + def test_load_table_from_json_basic_use(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + + client = self._make_client() + + json_rows = [ + {"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False}, + {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, + ] + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + with load_patch as load_table_from_file: + client.load_table_from_json(json_rows, self.TABLE_REF) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=mock.ANY, + job_id_prefix=None, + location=client.location, + project=client.project, + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON + assert sent_config.schema is None + + def test_load_table_from_json_non_default_args(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + + client = self._make_client() + + json_rows = [ + {"name": "One", "age": 11, "birthday": "2008-09-10", "adult": False}, + {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, + ] + + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + + with load_patch as load_table_from_file: + client.load_table_from_json( + json_rows, + self.TABLE_REF, + job_config=job.LoadJobConfig(), # TODO: add options? + project="project-x", + location="EU", + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=mock.ANY, + job_id_prefix=None, + location="EU", + project="project-x", + job_config=mock.ANY, + ) + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON + assert sent_config.schema is None + # Low-level tests @classmethod From 197a0be0aec9b2cfacbae317f4129877e32f8ee6 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 22 Aug 2019 11:17:48 +0200 Subject: [PATCH 2/5] Manipulate a copy of the job config if provided The load_table_from_json() should not directly change the job config passed in as an argument. --- bigquery/google/cloud/bigquery/client.py | 3 +++ bigquery/tests/unit/test_client.py | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 5ef519cfdadd..86061b0b23c8 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1644,6 +1644,9 @@ def load_table_from_json( if job_config is None: job_config = job.LoadJobConfig() + else: + # Make a copy so that the job config isn't modified in-place. + job_config = copy.deepcopy(job_config) job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON if project is None: diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 2fe2b9cb6fdd..cb86561c67b4 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5627,6 +5627,8 @@ def test_load_table_from_json_non_default_args(self): {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, ] + job_config = job.LoadJobConfig() + load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True ) @@ -5635,7 +5637,7 @@ def test_load_table_from_json_non_default_args(self): client.load_table_from_json( json_rows, self.TABLE_REF, - job_config=job.LoadJobConfig(), # TODO: add options? + job_config=job_config, project="project-x", location="EU", ) @@ -5653,6 +5655,7 @@ def test_load_table_from_json_non_default_args(self): ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert job_config.source_format is None # the original was not modified assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON assert sent_config.schema is None From 919d30ed6414de22540046e23eb72e0836631806 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 22 Aug 2019 12:01:25 +0200 Subject: [PATCH 3/5] Enable schema autodetect if no explicit schema --- bigquery/google/cloud/bigquery/client.py | 3 ++ bigquery/tests/system.py | 36 +++++++++++++++++++++++- bigquery/tests/unit/test_client.py | 2 ++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index 86061b0b23c8..f39a375833c4 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -1649,6 +1649,9 @@ def load_table_from_json( job_config = copy.deepcopy(job_config) job_config.source_format = job.SourceFormat.NEWLINE_DELIMITED_JSON + if job_config.schema is None: + job_config.autodetect = True + if project is None: project = self.project diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index ab660280a1d8..84d38ac09e25 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -915,7 +915,6 @@ def test_load_table_from_json_basic_use(self): {"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True}, ] - job_config = bigquery.LoadJobConfig(schema=table_schema) dataset_id = _make_dataset_id("bq_system_test") self.temp_dataset(dataset_id) table_id = "{}.{}.load_table_from_json_basic_use".format( @@ -939,6 +938,41 @@ def test_load_table_from_json_basic_use(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 2) + def test_load_table_from_json_schema_autodetect(self): + # Use schema with NULLABLE fields, because schema autodetection + # defaults to field mode NULLABLE. + table_schema = ( + bigquery.SchemaField("name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("birthday", "DATE", mode="NULLABLE"), + bigquery.SchemaField("is_awesome", "BOOLEAN", mode="NULLABLE"), + ) + + json_rows = [ + {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, + {"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True}, + ] + + dataset_id = _make_dataset_id("bq_system_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_json_basic_use".format( + Config.CLIENT.project, dataset_id + ) + + # Create the table before loading so that schema mismatch errors are + # identified. + table = retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + load_job = Config.CLIENT.load_table_from_json(json_rows, table_id) + load_job.result() + + table = Config.CLIENT.get_table(table) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 2) + def test_load_avro_from_uri_then_dump_table(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import SourceFormat diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index cb86561c67b4..87081d10ad34 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5615,6 +5615,7 @@ def test_load_table_from_json_basic_use(self): sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON assert sent_config.schema is None + assert sent_config.autodetect def test_load_table_from_json_non_default_args(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES @@ -5658,6 +5659,7 @@ def test_load_table_from_json_non_default_args(self): assert job_config.source_format is None # the original was not modified assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON assert sent_config.schema is None + assert sent_config.autodetect # Low-level tests From 6cc7f5824de6ba7cf0315fdac9193b09561c84f7 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 22 Aug 2019 14:10:12 +0200 Subject: [PATCH 4/5] Cover the path of schema provided in unit tests --- bigquery/tests/unit/test_client.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 87081d10ad34..0c1a33bf971f 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5618,8 +5618,9 @@ def test_load_table_from_json_basic_use(self): assert sent_config.autodetect def test_load_table_from_json_non_default_args(self): - from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES from google.cloud.bigquery import job + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery.schema import SchemaField client = self._make_client() @@ -5628,7 +5629,12 @@ def test_load_table_from_json_non_default_args(self): {"name": "Two", "age": 22, "birthday": "1997-08-09", "adult": True}, ] - job_config = job.LoadJobConfig() + schema = [ + SchemaField("name", "STRING"), + SchemaField("age", "INTEGER"), + SchemaField("adult", "BOOLEAN"), + ] + job_config = job.LoadJobConfig(schema=schema) load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True @@ -5658,8 +5664,8 @@ def test_load_table_from_json_non_default_args(self): sent_config = load_table_from_file.mock_calls[0][2]["job_config"] assert job_config.source_format is None # the original was not modified assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON - assert sent_config.schema is None - assert sent_config.autodetect + assert sent_config.schema == schema + assert not sent_config.autodetect # Low-level tests From 4ec2fe1834f939af7bf4c845179b015ae979a705 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 22 Aug 2019 20:58:08 +0200 Subject: [PATCH 5/5] Improve tests readability and harden assertions --- bigquery/tests/system.py | 21 ++++++++++----------- bigquery/tests/unit/test_client.py | 3 +++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index 84d38ac09e25..8749707b62da 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -939,15 +939,6 @@ def test_load_table_from_json_basic_use(self): self.assertEqual(table.num_rows, 2) def test_load_table_from_json_schema_autodetect(self): - # Use schema with NULLABLE fields, because schema autodetection - # defaults to field mode NULLABLE. - table_schema = ( - bigquery.SchemaField("name", "STRING", mode="NULLABLE"), - bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), - bigquery.SchemaField("birthday", "DATE", mode="NULLABLE"), - bigquery.SchemaField("is_awesome", "BOOLEAN", mode="NULLABLE"), - ) - json_rows = [ {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, {"name": "Chuck", "age": 79, "birthday": "1940-03-10", "is_awesome": True}, @@ -959,13 +950,21 @@ def test_load_table_from_json_schema_autodetect(self): Config.CLIENT.project, dataset_id ) - # Create the table before loading so that schema mismatch errors are - # identified. + # Use schema with NULLABLE fields, because schema autodetection + # defaults to field mode NULLABLE. + table_schema = ( + bigquery.SchemaField("name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("birthday", "DATE", mode="NULLABLE"), + bigquery.SchemaField("is_awesome", "BOOLEAN", mode="NULLABLE"), + ) + # create the table before loading so that the column order is predictable table = retry_403(Config.CLIENT.create_table)( Table(table_id, schema=table_schema) ) self.to_delete.insert(0, table) + # do not pass an explicit job config to trigger automatic schema detection load_job = Config.CLIENT.load_table_from_json(json_rows, table_id) load_job.result() diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 0c1a33bf971f..71886354c8ba 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -5635,6 +5635,7 @@ def test_load_table_from_json_non_default_args(self): SchemaField("adult", "BOOLEAN"), ] job_config = job.LoadJobConfig(schema=schema) + job_config._properties["load"]["unknown_field"] = "foobar" load_patch = mock.patch( "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True @@ -5666,6 +5667,8 @@ def test_load_table_from_json_non_default_args(self): assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON assert sent_config.schema == schema assert not sent_config.autodetect + # all properties should have been cloned and sent to the backend + assert sent_config._properties.get("load", {}).get("unknown_field") == "foobar" # Low-level tests