Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for more detailed DML stats #758

Merged
merged 2 commits into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ Job-Related Types
job.Compression
job.CreateDisposition
job.DestinationFormat
job.DmlStats
job.Encoding
job.OperationType
job.QueryPlanEntry
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import DestinationFormat
from google.cloud.bigquery.job import DmlStats
from google.cloud.bigquery.job import Encoding
from google.cloud.bigquery.job import ExtractJob
from google.cloud.bigquery.job import ExtractJobConfig
Expand Down Expand Up @@ -142,6 +143,7 @@
"BigtableOptions",
"BigtableColumnFamily",
"BigtableColumn",
"DmlStats",
"CSVOptions",
"GoogleSheetsOptions",
"ParquetOptions",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.load import LoadJobConfig
from google.cloud.bigquery.job.query import _contains_order_by
from google.cloud.bigquery.job.query import DmlStats
from google.cloud.bigquery.job.query import QueryJob
from google.cloud.bigquery.job.query import QueryJobConfig
from google.cloud.bigquery.job.query import QueryPlanEntry
Expand Down Expand Up @@ -66,6 +67,7 @@
"LoadJob",
"LoadJobConfig",
"_contains_order_by",
"DmlStats",
"QueryJob",
"QueryJobConfig",
"QueryPlanEntry",
Expand Down
37 changes: 37 additions & 0 deletions google/cloud/bigquery/job/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,38 @@ def _to_api_repr_table_defs(value):
return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}


class DmlStats(typing.NamedTuple):
"""Detailed statistics for DML statements.

https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
"""

inserted_row_count: int = 0
"""Number of inserted rows. Populated by DML INSERT and MERGE statements."""

deleted_row_count: int = 0
"""Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
"""

updated_row_count: int = 0
"""Number of updated rows. Populated by DML UPDATE and MERGE statements."""

@classmethod
def from_api_repr(cls, stats: Optional[Dict[str, str]]) -> Optional["DmlStats"]:
if stats is None:
return None
plamut marked this conversation as resolved.
Show resolved Hide resolved

# NOTE: The field order here must match the order of fields set at the
# class level.
api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")

args = (
int(stats.get(api_field, default_val))
for api_field, default_val in zip(api_fields, cls.__new__.__defaults__)
)
return cls(*args)


class ScriptOptions:
"""Options controlling the execution of scripts.

Expand Down Expand Up @@ -1079,6 +1111,11 @@ def estimated_bytes_processed(self):
result = int(result)
return result

@property
def dml_stats(self) -> Optional[DmlStats]:
stats = self._job_statistics().get("dmlStats")
return DmlStats.from_api_repr(stats)

def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
Expand Down
56 changes: 56 additions & 0 deletions tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,62 @@ def test_query_statistics(self):
self.assertGreater(stages_with_inputs, 0)
self.assertGreater(len(plan), stages_with_inputs)

def test_dml_statistics(self):
table_schema = (
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("bar", "INTEGER"),
)

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.test_dml_statistics".format(Config.CLIENT.project, dataset_id)

# Create the table before loading so that the column order is deterministic.
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# Insert a few rows and check the stats.
sql = f"""
INSERT INTO `{table_id}`
VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4);
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 4
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 0

# Update some of the rows.
sql = f"""
UPDATE `{table_id}`
SET bar = bar + 1
WHERE bar > 2;
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 0
assert query_job.dml_stats.updated_row_count == 2
assert query_job.dml_stats.deleted_row_count == 0

# Now delete a few rows and check the stats.
sql = f"""
DELETE FROM `{table_id}`
WHERE foo != "two";
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 0
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 3

def test_dbapi_w_standard_sql_types(self):
for sql, expected in helpers.STANDARD_SQL_EXAMPLES:
Config.CURSOR.execute(sql)
Expand Down
64 changes: 64 additions & 0 deletions tests/unit/job/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ def _verify_table_definitions(self, job, config):
self.assertIsNotNone(expected_ec)
self.assertEqual(found_ec.to_api_repr(), expected_ec)

def _verify_dml_stats_resource_properties(self, job, resource):
query_stats = resource.get("statistics", {}).get("query", {})

if "dmlStats" in query_stats:
resource_dml_stats = query_stats["dmlStats"]
job_dml_stats = job.dml_stats
assert str(job_dml_stats.inserted_row_count) == resource_dml_stats.get(
"insertedRowCount", "0"
)
assert str(job_dml_stats.updated_row_count) == resource_dml_stats.get(
"updatedRowCount", "0"
)
assert str(job_dml_stats.deleted_row_count) == resource_dml_stats.get(
"deletedRowCount", "0"
)
else:
assert job.dml_stats is None

def _verify_configuration_properties(self, job, configuration):
if "dryRun" in configuration:
self.assertEqual(job.dry_run, configuration["dryRun"])
Expand All @@ -118,6 +136,7 @@ def _verify_configuration_properties(self, job, configuration):

def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)
self._verify_dml_stats_resource_properties(job, resource)

configuration = resource.get("configuration", {})
self._verify_configuration_properties(job, configuration)
Expand All @@ -130,16 +149,19 @@ def _verifyResourceProperties(self, job, resource):
self._verify_table_definitions(job, query_config)

self.assertEqual(job.query, query_config["query"])

if "createDisposition" in query_config:
self.assertEqual(job.create_disposition, query_config["createDisposition"])
else:
self.assertIsNone(job.create_disposition)

if "defaultDataset" in query_config:
ds_ref = job.default_dataset
ds_ref = {"projectId": ds_ref.project, "datasetId": ds_ref.dataset_id}
self.assertEqual(ds_ref, query_config["defaultDataset"])
else:
self.assertIsNone(job.default_dataset)

if "destinationTable" in query_config:
table = job.destination
tb_ref = {
Expand All @@ -150,14 +172,17 @@ def _verifyResourceProperties(self, job, resource):
self.assertEqual(tb_ref, query_config["destinationTable"])
else:
self.assertIsNone(job.destination)

if "priority" in query_config:
self.assertEqual(job.priority, query_config["priority"])
else:
self.assertIsNone(job.priority)

if "writeDisposition" in query_config:
self.assertEqual(job.write_disposition, query_config["writeDisposition"])
else:
self.assertIsNone(job.write_disposition)

if "destinationEncryptionConfiguration" in query_config:
self.assertIsNotNone(job.destination_encryption_configuration)
self.assertEqual(
Expand All @@ -166,6 +191,7 @@ def _verifyResourceProperties(self, job, resource):
)
else:
self.assertIsNone(job.destination_encryption_configuration)

if "schemaUpdateOptions" in query_config:
self.assertEqual(
job.schema_update_options, query_config["schemaUpdateOptions"]
Expand All @@ -190,6 +216,7 @@ def test_ctor_defaults(self):
self.assertIsNone(job.create_disposition)
self.assertIsNone(job.default_dataset)
self.assertIsNone(job.destination)
self.assertIsNone(job.dml_stats)
self.assertIsNone(job.flatten_results)
self.assertIsNone(job.priority)
self.assertIsNone(job.use_query_cache)
Expand Down Expand Up @@ -278,6 +305,26 @@ def test_from_api_repr_with_encryption(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_dml_stats(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": self.JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"configuration": {"query": {"query": self.QUERY}},
"statistics": {
"query": {
"dmlStats": {"insertedRowCount": "15", "updatedRowCount": "2"},
},
},
}
klass = self._get_target_class()

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SchemaUpdateOption
Expand Down Expand Up @@ -815,6 +862,23 @@ def test_estimated_bytes_processed(self):
query_stats["estimatedBytesProcessed"] = str(est_bytes)
self.assertEqual(job.estimated_bytes_processed, est_bytes)

def test_dml_stats(self):
from google.cloud.bigquery.job.query import DmlStats

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, self.QUERY, client)
assert job.dml_stats is None

statistics = job._properties["statistics"] = {}
assert job.dml_stats is None

query_stats = statistics["query"] = {}
assert job.dml_stats is None

query_stats["dmlStats"] = {"insertedRowCount": "35"}
assert isinstance(job.dml_stats, DmlStats)
assert job.dml_stats.inserted_row_count == 35

def test_result(self):
from google.cloud.bigquery.table import RowIterator

Expand Down
42 changes: 42 additions & 0 deletions tests/unit/job/test_query_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,48 @@
from .helpers import _Base


class TestDmlStats:
@staticmethod
def _get_target_class():
from google.cloud.bigquery.job import DmlStats

return DmlStats

def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

def test_ctor_defaults(self):
dml_stats = self._make_one()
assert dml_stats.inserted_row_count == 0
assert dml_stats.deleted_row_count == 0
assert dml_stats.updated_row_count == 0

def test_from_api_repr_none(self):
klass = self._get_target_class()
result = klass.from_api_repr(None)
assert result is None

def test_from_api_repr_partial_stats(self):
klass = self._get_target_class()
result = klass.from_api_repr({"deletedRowCount": "12"})

assert isinstance(result, klass)
assert result.inserted_row_count == 0
assert result.deleted_row_count == 12
assert result.updated_row_count == 0

def test_from_api_repr_full_stats(self):
klass = self._get_target_class()
result = klass.from_api_repr(
{"updatedRowCount": "4", "insertedRowCount": "7", "deletedRowCount": "25"}
)

assert isinstance(result, klass)
assert result.inserted_row_count == 7
assert result.deleted_row_count == 25
assert result.updated_row_count == 4


class TestQueryPlanEntryStep(_Base):
KIND = "KIND"
SUBSTEPS = ("SUB1", "SUB2")
Expand Down