Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query pipeline changes and Offset/Limit/Distinct support #6770

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
2e27f01
intiial commit for query piepline changes
Aug 12, 2019
3d90acb
initial commit for offset and limit
Aug 13, 2019
bab7e2c
modified aggregate tests to check for top as well after bugfix
Aug 13, 2019
1bc1213
Added support for distinct
Aug 15, 2019
34729b7
modified aggregate tests to run in mono repo
Aug 15, 2019
2a25ad8
fixed failing tests and bugs
Aug 16, 2019
a88ff22
updated tests
Aug 16, 2019
57c250d
fixed hashing problem for distinct
Aug 26, 2019
2b0d90f
fixed bug in distinct queries
Aug 27, 2019
d72b8ff
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-pytho…
Aug 28, 2019
875e55b
replaced single quotes with double quotes
Aug 28, 2019
fb40ea9
re introduced hashing via sha1
Aug 29, 2019
0db6371
fixed bug in distinct for py3
Aug 29, 2019
593e863
Merge remote-tracking branch 'github-azure-sdk-master/master' into sr…
Aug 29, 2019
9e33c16
dummy commit
Aug 29, 2019
f0e356a
dummy commit
Aug 29, 2019
73af86a
[Cosmos] Core pipeline integration (#6961)
annatisch Aug 29, 2019
3bed8a0
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 4, 2019
b25b64c
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 4, 2019
9b05a6b
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 5, 2019
dc7d9e8
[Cosmos] Applying track 2 SDK guidelines (#7021)
annatisch Sep 9, 2019
dc0836e
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 9, 2019
577c122
Added support for Urllib3 Connection retries
Sep 9, 2019
7bc395b
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Sep 10, 2019
b5b4f8b
[Cosmos] Bumped dependency (#7147)
annatisch Sep 10, 2019
700f5db
Misc fixes for Cosmos SDK (#7157)
Sep 10, 2019
efd178d
resolved megre conflicts
Sep 10, 2019
4cd7f37
Merge branch 'feature/cosmos-preview4' into srnara/queryPipeline
annatisch Sep 26, 2019
962391c
[Cosmos] Reconfigure retry policy (#7544)
annatisch Oct 4, 2019
6b70f06
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 4, 2019
dc9ca57
[Cosmos] Docs updates (#7626)
annatisch Oct 4, 2019
96ad56a
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 7, 2019
ad54c4f
add sdk tools repo (#7656)
danieljurek Oct 7, 2019
c0d0967
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 7, 2019
81d96ce
[Cosmos] More docs cleanup (#7661)
annatisch Oct 8, 2019
dcbbacf
Merge remote-tracking branch 'origin/master' into feature/cosmos-prev…
annatisch Oct 8, 2019
dcd9276
dummy commit
Oct 9, 2019
12474a1
reverted dummy commit
Oct 9, 2019
1a8230f
Merge branch 'feature/cosmos-preview4' into srnara/queryPipeline
Oct 9, 2019
e258bc8
Merge remote-tracking branch 'azure-sdk-main/master' into srnara/quer…
Oct 9, 2019
02da658
fixed failing test
Oct 10, 2019
59fa49b
fixed failing tests
Oct 10, 2019
dcc1b5c
updated comment
Oct 10, 2019
83664b1
added **kwargs to _GetQueryPlanThroughGateway
Oct 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("offerThroughput"):
headers[http_constants.HttpHeaders.OfferThroughput] = options["offerThroughput"]

if options.get("contentType"):
headers[http_constants.HttpHeaders.ContentType] = options['contentType']

if options.get("isQueryPlanRequest"):
headers[http_constants.HttpHeaders.IsQueryPlanRequest] = options['isQueryPlanRequest']

if options.get("supportedQueryFeatures"):
headers[http_constants.HttpHeaders.SupportedQueryFeatures] = options['supportedQueryFeatures']

if options.get("queryVersion"):
headers[http_constants.HttpHeaders.QueryVersion] = options['queryVersion']

if "partitionKey" in options:
# if partitionKey value is Undefined, serialize it as [{}] to be consistent with other SDKs.
if options.get("partitionKey") is partition_key._Undefined:
Expand Down
65 changes: 50 additions & 15 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "dbs")

def ReadContainers(self, database_link, options=None):
"""Reads all collections in a database.
Expand Down Expand Up @@ -336,7 +336,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "colls")

def CreateContainer(self, database_link, collection, options=None):
"""Creates a collection in a database.
Expand Down Expand Up @@ -519,7 +519,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "users")

def DeleteDatabase(self, database_link, options=None):
"""Deletes a database.
Expand Down Expand Up @@ -661,7 +661,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "permissions")

def ReplaceUser(self, user_link, user, options=None):
"""Replaces a user and return it.
Expand Down Expand Up @@ -818,7 +818,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn, database_or_Container_link)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "docs", database_or_Container_link)

def QueryItemsChangeFeed(self, collection_link, options=None, response_hook=None):
"""Queries documents change feed in a collection.
Expand Down Expand Up @@ -898,7 +898,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, None, options, fetch_fn, collection_link)
return query_iterable.QueryIterable(self, None, options, fetch_fn, resource_key, collection_link)

def _ReadPartitionKeyRanges(self, collection_link, feed_options=None):
"""Reads Partition Key Ranges.
Expand Down Expand Up @@ -947,7 +947,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "pkranges")

def CreateItem(self, database_or_Container_link, document, options=None):
"""Creates a document in a collection.
Expand Down Expand Up @@ -1131,7 +1131,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "triggers")

def CreateTrigger(self, collection_link, trigger, options=None):
"""Creates a trigger in a collection.
Expand Down Expand Up @@ -1256,7 +1256,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "udfs")

def CreateUserDefinedFunction(self, collection_link, udf, options=None):
"""Creates a user defined function in a collection.
Expand Down Expand Up @@ -1381,7 +1381,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "sprocs")

def CreateStoredProcedure(self, collection_link, sproc, options=None):
"""Creates a stored procedure in a collection.
Expand Down Expand Up @@ -1504,7 +1504,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "conflicts")

def ReadConflict(self, conflict_link, options=None):
"""Reads a conflict.
Expand Down Expand Up @@ -1780,7 +1780,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "attachments")

def ReadMedia(self, media_link):
"""Reads a media.
Expand Down Expand Up @@ -2173,7 +2173,7 @@ def fetch_fn(options):
self.last_response_headers,
)

return query_iterable.QueryIterable(self, query, options, fetch_fn)
return query_iterable.QueryIterable(self, query, options, fetch_fn, "offers")

def GetDatabaseAccount(self, url_connection=None):
"""Gets database account info.
Expand Down Expand Up @@ -2506,7 +2506,7 @@ def QueryFeed(self, path, collection_id, query, options, partition_key_range_id=
)

def __QueryFeed(
self, path, typ, id_, result_fn, create_fn, query, options=None, partition_key_range_id=None, response_hook=None
self, path, typ, id_, result_fn, create_fn, query, options=None, partition_key_range_id=None, response_hook=None, is_query_plan=False
):
"""Query for more than one Azure Cosmos resources.

Expand All @@ -2520,6 +2520,9 @@ def __QueryFeed(
The request options for the request.
:param str partition_key_range_id:
Specifies partition key range id.
:param function response_hook:
:param bool is_query_plan:
Specififes if the call is to fetch query plan

:rtype:
list
Expand All @@ -2545,7 +2548,7 @@ def __GetBodiesFromQueryResult(result):
# Copy to make sure that default_headers won't be changed.
if query is None:
# Query operations will use ReadEndpoint even though it uses GET(for feed requests)
request = _request_object.RequestObject(typ, documents._OperationType.ReadFeed)
request = _request_object.RequestObject(typ, documents._OperationType.QueryPlan if is_query_plan else documents._OperationType.ReadFeed)
headers = base.GetHeaders(self, initial_headers, "get", path, id_, typ, options, partition_key_range_id)
result, self.last_response_headers = self.__Get(path, request, headers)
if response_hook:
Expand All @@ -2555,6 +2558,9 @@ def __GetBodiesFromQueryResult(result):
query = self.__CheckAndUnifyQueryFormat(query)

initial_headers[http_constants.HttpHeaders.IsQuery] = "true"
if not is_query_plan:
initial_headers[http_constants.HttpHeaders.IsQuery] = "true"

if (
self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Default
or self._query_compatibility_mode == CosmosClientConnection._QueryCompatibilityMode.Query
Expand All @@ -2575,6 +2581,35 @@ def __GetBodiesFromQueryResult(result):

return __GetBodiesFromQueryResult(result)

def _GetQueryPlanThroughGateway(self, query, resource_link):
supported_query_features = (documents._QueryFeature.Aggregate + "," +
documents._QueryFeature.CompositeAggregate + "," +
documents._QueryFeature.Distinct + "," +
documents._QueryFeature.MultipleOrderBy + "," +
documents._QueryFeature.OffsetAndLimit + "," +
documents._QueryFeature.OrderBy + "," +
documents._QueryFeature.Top)

options = {
"contentType": runtime_constants.MediaTypes.Json,
"isQueryPlanRequest": True,
"supportedQueryFeatures": supported_query_features,
"queryVersion": http_constants.Versions.QueryVersion
}

resource_link = base.TrimBeginningAndEndingSlashes(resource_link)
path = base.GetPathFromLink(resource_link, "docs")
resource_id = base.GetResourceIdOrFullNameFromLink(resource_link)

return self.__QueryFeed(path,
"docs",
resource_id,
lambda r: r,
None,
query,
options,
is_query_plan=True)

def __CheckAndUnifyQueryFormat(self, query_body):
"""Checks and unifies the format of the query body.

Expand Down
5 changes: 3 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_default_retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ def __init__(self, *args):

def needsRetry(self, error_code):
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if self.args:
if (self.args[4]["method"] == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[4]["headers"]):
if (len(self.args) > 0):
if (self.args[4]["method"] == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[4]["headers"])\
or (http_constants.HttpHeaders.IsQueryPlanRequest in self.args[4]["headers"]):
return True
return False
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,100 +171,3 @@ def __init__(self, client, options, fetch_function):
def _fetch_next_block(self):
while super(_DefaultQueryExecutionContext, self)._has_more_pages() and not self._buffer:
return self._fetch_items_helper_with_retries(self._fetch_function)


class _MultiCollectionQueryExecutionContext(_QueryExecutionContextBase):
"""
This class is used if it is client side partitioning
"""

def __init__(self, client, options, database_link, query, partition_key):
"""
Constructor
:param CosmosClient client:
:param dict options:
The request options for the request.
:param str database_link: database self link or ID based link
:param (str or dict) query:
Partition_key (str): partition key for the query

"""
super(_MultiCollectionQueryExecutionContext, self).__init__(client, options)

self._current_collection_index = 0
self._collection_links = []
self._collection_links_length = 0

self._query = query
self._client = client

partition_resolver = client.GetPartitionResolver(database_link)

if partition_resolver is None:
raise ValueError(client.PartitionResolverErrorMessage)

self._collection_links = partition_resolver.ResolveForRead(partition_key)

self._collection_links_length = len(self._collection_links)

if self._collection_links is None:
raise ValueError("_collection_links is None.")

if self._collection_links_length <= 0:
raise ValueError("_collection_links_length is not greater than 0.")

# Creating the QueryFeed for the first collection
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], "docs")
collection_id = _base.GetResourceIdOrFullNameFromLink(self._collection_links[self._current_collection_index])

self._current_collection_index += 1

def fetch_fn(options):
return client.QueryFeed(path, collection_id, query, options)

self._fetch_function = fetch_fn

def _has_more_pages(self):
return (
not self._has_started
or self._continuation
or (self._collection_links and self._current_collection_index < self._collection_links_length)
)

def _fetch_next_block(self):
"""Fetches the next block of query results.

This iterates fetches the next block of results from the current collection link.
Once the current collection results were exhausted. It moves to the next collection link.

:return:
List of fetched items.
:rtype: list
"""
# Fetch next block of results by executing the query against the current document collection
fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)

# If there are multiple document collections to query for(in case of partitioning),
# keep looping through each one of them, creating separate feed queries for each
# collection and fetching the items
while not fetched_items:
if self._collection_links and self._current_collection_index < self._collection_links_length:
path = _base.GetPathFromLink(self._collection_links[self._current_collection_index], "docs")
collection_id = _base.GetResourceIdOrFullNameFromLink(
self._collection_links[self._current_collection_index]
)

self._continuation = None
self._has_started = False

def fetch_fn(options):
return self._client.QueryFeed(path, collection_id, self._query, options)

self._fetch_function = fetch_fn

fetched_items = self._fetch_items_helper_with_retries(self._fetch_function)
self._current_collection_index += 1
else:
break

return fetched_items
Loading