Skip to content

Commit

Permalink
Preserve order in to_dataframe with BQ Storage and queries containi…
Browse files Browse the repository at this point in the history
…ng ORDER BY

This fixes an issue where due to reading from multiple stream in
parallel, the order of rows is not preserved. Normally this isn't an
issue, but it is when the rows are query results from an ORDER BY query.
  • Loading branch information
tswast committed Apr 23, 2019
1 parent 4dc8c36 commit 3b0c7ae
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 29 deletions.
15 changes: 13 additions & 2 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Define API Jobs."""

import copy
import re
import threading

import six
Expand Down Expand Up @@ -92,6 +93,14 @@ def _error_result_to_exception(error_result):
)


def _contains_order_by(query):
"""Do we need to preserve the order of the query results?"""
if query is None:
return False

return re.search(r"ORDER\s+BY", query, re.IGNORECASE) is not None


class Compression(object):
"""The compression type to use for exported files. The default value is
:attr:`NONE`.
Expand Down Expand Up @@ -2546,7 +2555,7 @@ def from_api_repr(cls, resource, client):
:returns: Job parsed from ``resource``.
"""
job_id, config = cls._get_resource_config(resource)
query = config["query"]["query"]
query = _helpers._get_sub_prop(config, ["query", "query"])
job = cls(job_id, query, client=client)
job._set_properties(resource)
return job
Expand Down Expand Up @@ -2849,7 +2858,9 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table_ref = self.destination
dest_table = Table(dest_table_ref, schema=schema)
dest_table._properties["numRows"] = self._query_results.total_rows
return self._client.list_rows(dest_table, retry=retry)
rows = self._client.list_rows(dest_table, retry=retry)
rows._preserve_order = _contains_order_by(self.query)
return rows

def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None):
"""Return a pandas DataFrame from a QueryJob
Expand Down
10 changes: 9 additions & 1 deletion bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,7 @@ def __init__(
)
self._field_to_index = _helpers._field_to_index_mapping(schema)
self._page_size = page_size
self._preserve_order = False
self._project = client.project
self._schema = schema
self._selected_fields = selected_fields
Expand Down Expand Up @@ -1496,10 +1497,15 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
for field in self._selected_fields:
read_options.selected_fields.append(field.name)

requested_streams = 0
if self._preserve_order:
requested_streams = 1

session = bqstorage_client.create_read_session(
self._table.to_bqstorage(),
"projects/{}".format(self._project),
read_options=read_options,
requested_streams=requested_streams,
)

# We need to parse the schema manually so that we can rearrange the
Expand All @@ -1512,6 +1518,8 @@ def _to_dataframe_bqstorage(self, bqstorage_client, dtypes, progress_bar=None):
if not session.streams:
return pandas.DataFrame(columns=columns)

total_streams = len(session.streams)

# Use _to_dataframe_finished to notify worker threads when to quit.
# See: https://stackoverflow.com/a/29237343/101923
self._to_dataframe_finished = False
Expand Down Expand Up @@ -1560,7 +1568,7 @@ def get_frames(pool):

return frames

with concurrent.futures.ThreadPoolExecutor() as pool:
with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
try:
frames = get_frames(pool)
finally:
Expand Down
177 changes: 151 additions & 26 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import unittest

import mock
import pytest
from six.moves import http_client

try:
Expand Down Expand Up @@ -59,6 +60,47 @@ def _make_connection(*responses):
return mock_conn


def _make_job_resource(
creation_time_ms=1437767599006,
started_time_ms=1437767600007,
ended_time_ms=1437767601008,
started=False,
ended=False,
etag="abc-def-hjk",
endpoint="https://www.googleapis.com",
job_type="load",
job_id="a-random-id",
project_id="some-project",
user_email="bq-user@example.com",
):
resource = {
"configuration": {job_type: {}},
"statistics": {"creationTime": creation_time_ms, job_type: {}},
"etag": etag,
"id": "{}:{}".format(project_id, job_id),
"jobReference": {"projectId": project_id, "jobId": job_id},
"selfLink": "{}/bigquery/v2/projects/{}/jobs/{}".format(
endpoint, project_id, job_id
),
"user_email": user_email,
}

if started or ended:
resource["statistics"]["startTime"] = started_time_ms

if ended:
resource["statistics"]["endTime"] = ended_time_ms

if job_type == "query":
resource["configuration"]["query"]["destinationTable"] = {
"projectId": project_id,
"datasetId": "_temp_dataset",
"tableId": "_temp_table",
}

return resource


class Test__error_result_to_exception(unittest.TestCase):
def _call_fut(self, *args, **kwargs):
from google.cloud.bigquery import job
Expand Down Expand Up @@ -974,6 +1016,7 @@ class _Base(object):
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.table import TableReference

ENDPOINT = "https://www.googleapis.com"
PROJECT = "project"
SOURCE1 = "http://example.com/source1.csv"
DS_ID = "dataset_id"
Expand All @@ -994,7 +1037,9 @@ def _setUpConstants(self):
self.WHEN = datetime.datetime.utcfromtimestamp(self.WHEN_TS).replace(tzinfo=UTC)
self.ETAG = "ETAG"
self.FULL_JOB_ID = "%s:%s" % (self.PROJECT, self.JOB_ID)
self.RESOURCE_URL = "http://example.com/path/to/resource"
self.RESOURCE_URL = "{}/bigquery/v2/projects/{}/jobs/{}".format(
self.ENDPOINT, self.PROJECT, self.JOB_ID
)
self.USER_EMAIL = "phred@example.com"

def _table_ref(self, table_id):
Expand All @@ -1004,30 +1049,19 @@ def _table_ref(self, table_id):

def _make_resource(self, started=False, ended=False):
self._setUpConstants()
resource = {
"configuration": {self.JOB_TYPE: {}},
"statistics": {"creationTime": self.WHEN_TS * 1000, self.JOB_TYPE: {}},
"etag": self.ETAG,
"id": self.FULL_JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"selfLink": self.RESOURCE_URL,
"user_email": self.USER_EMAIL,
}

if started or ended:
resource["statistics"]["startTime"] = self.WHEN_TS * 1000

if ended:
resource["statistics"]["endTime"] = (self.WHEN_TS + 1000) * 1000

if self.JOB_TYPE == "query":
resource["configuration"]["query"]["destinationTable"] = {
"projectId": self.PROJECT,
"datasetId": "_temp_dataset",
"tableId": "_temp_table",
}

return resource
return _make_job_resource(
creation_time_ms=int(self.WHEN_TS * 1000),
started_time_ms=int(self.WHEN_TS * 1000),
ended_time_ms=int(self.WHEN_TS * 1000) + 1000000,
started=started,
ended=ended,
etag=self.ETAG,
endpoint=self.ENDPOINT,
job_type=self.JOB_TYPE,
job_id=self.JOB_ID,
project_id=self.PROJECT,
user_email=self.USER_EMAIL,
)

def _verifyInitialReadonlyProperties(self, job):
# root elements of resource
Expand Down Expand Up @@ -4684,7 +4718,11 @@ def test_to_dataframe_bqstorage(self):
job.to_dataframe(bqstorage_client=bqstorage_client)

bqstorage_client.create_read_session.assert_called_once_with(
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
mock.ANY,
"projects/{}".format(self.PROJECT),
read_options=mock.ANY,
# Use default number of streams for best performance.
requested_streams=0,
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
Expand Down Expand Up @@ -5039,3 +5077,90 @@ def test_from_api_repr_normal(self):
self.assertEqual(entry.pending_units, self.PENDING_UNITS)
self.assertEqual(entry.completed_units, self.COMPLETED_UNITS)
self.assertEqual(entry.slot_millis, self.SLOT_MILLIS)


@pytest.mark.parametrize(
"query,expected",
(
(None, False),
("", False),
("select name, age from table", False),
("select name, age from table LIMIT 10;", False),
("select name, age from table order by other_column;", True),
("Select name, age From table Order By other_column", True),
("SELECT name, age FROM table ORDER BY other_column;", True),
("select name, age from table order\nby other_column", True),
("Select name, age From table Order\nBy other_column;", True),
("SELECT name, age FROM table ORDER\nBY other_column", True),
("SelecT name, age froM table OrdeR \n\t BY other_column;", True),
),
)
def test__contains_order_by(query, expected):
from google.cloud.bigquery import job as mut

assert mut._contains_order_by(query) == expected


@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
@pytest.mark.skipif(
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
)
@pytest.mark.parametrize(
"query",
(
"select name, age from table order by other_column;",
"Select name, age From table Order By other_column;",
"SELECT name, age FROM table ORDER BY other_column;",
"select name, age from table order\nby other_column;",
"Select name, age From table Order\nBy other_column;",
"SELECT name, age FROM table ORDER\nBY other_column;",
"SelecT name, age froM table OrdeR \n\t BY other_column;",
),
)
def test_to_dataframe_bqstorage_preserve_order(query):
from google.cloud.bigquery.job import QueryJob as target_class

job_resource = _make_job_resource(
project_id="test-project", job_type="query", ended=True
)
job_resource["configuration"]["query"]["query"] = query
job_resource["status"] = {"state": "DONE"}
get_query_results_resource = {
"jobComplete": True,
"jobReference": {"projectId": "test-project", "jobId": "test-job"},
"schema": {
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
"totalRows": "4",
}
connection = _make_connection(get_query_results_resource, job_resource)
client = _make_client(connection=connection)
job = target_class.from_api_repr(job_resource, client)
bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "long"]},
],
}
)
bqstorage_client.create_read_session.return_value = session

job.to_dataframe(bqstorage_client=bqstorage_client)

bqstorage_client.create_read_session.assert_called_once_with(
mock.ANY,
"projects/test-project",
read_options=mock.ANY,
# Use a single stream to preserve row order.
requested_streams=1,
)

0 comments on commit 3b0c7ae

Please sign in to comment.