From 12ff1b283df45668c424443fb13773f58c3ba75e Mon Sep 17 00:00:00 2001 From: Srinath Narayanan Date: Tue, 22 Sep 2020 14:32:14 -0700 Subject: [PATCH] Added partition key param for querying change feed (#13857) * initia; changes for partitionkey for query changefeed * Added test * updated changelog * moved partition_key to kwargs --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 2 ++ .../azure-cosmos/azure/cosmos/container.py | 4 +++ sdk/cosmos/azure-cosmos/test/test_query.py | 36 +++++++++++-------- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 90015159104c..319c383bf5a9 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,6 +3,8 @@ **Bug fixes** - Fixed bug where continuation token is not honored when query_iterable is used to get results by page. Issue #13265. +**New features** +- Added support for passing partitionKey while querying changefeed. Issue #11689. ## 4.1.0 (2020-08-10) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 9e1d86f24dca..4211937d0dc2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -249,6 +249,7 @@ def query_items_change_feed( :param partition_key_range_id: ChangeFeed requests can be executed against specific partition key ranges. This is used to process the change feed in parallel across multiple consumers. + :param partition_key: partition key at which ChangeFeed requests are targetted. :param is_start_from_beginning: Get whether change feed should start from beginning (true) or from current (false). By default it's start from current (false). :param continuation: e_tag value to be used as continuation for reading change feed. @@ -261,6 +262,9 @@ def query_items_change_feed( response_hook = kwargs.pop('response_hook', None) if partition_key_range_id is not None: feed_options["partitionKeyRangeId"] = partition_key_range_id + partition_key = kwargs.pop("partitionKey", None) + if partition_key is not None: + feed_options["partitionKey"] = partition_key if is_start_from_beginning is not None: feed_options["isStartFromBeginning"] = is_start_from_beginning if max_item_count is not None: diff --git a/sdk/cosmos/azure-cosmos/test/test_query.py b/sdk/cosmos/azure-cosmos/test/test_query.py index 44d59975c46c..97ca042aa485 100644 --- a/sdk/cosmos/azure-cosmos/test/test_query.py +++ b/sdk/cosmos/azure-cosmos/test/test_query.py @@ -47,10 +47,18 @@ def test_first_and_last_slashes_trimmed_for_query_string (self): iter_list = list(query_iterable) self.assertEqual(iter_list[0]['id'], 'myId') - def test_query_change_feed(self): + def test_query_change_feed_with_pk(self): + self.query_change_feed(True) + + def test_query_change_feed_with_pk_range_id(self): + self.query_change_feed(False) + + def query_change_feed(self, use_partition_key): created_collection = self.config.create_multi_partition_collection_with_custom_pk_if_not_exist(self.client) # The test targets partition #3 - pkRangeId = "2" + partition_key = "pk" + partition_key_range_id = 2 + partitionParam = {"partition_key": partition_key} if use_partition_key else {"partition_key_range_id": partition_key_range_id} # Read change feed without passing any options query_iterable = created_collection.query_items_change_feed() @@ -58,7 +66,7 @@ def test_query_change_feed(self): self.assertEqual(len(iter_list), 0) # Read change feed from current should return an empty list - query_iterable = created_collection.query_items_change_feed(partition_key_range_id=pkRangeId) + query_iterable = created_collection.query_items_change_feed(**partitionParam) iter_list = list(query_iterable) self.assertEqual(len(iter_list), 0) self.assertTrue('etag' in created_collection.client_connection.last_response_headers) @@ -66,8 +74,8 @@ def test_query_change_feed(self): # Read change feed from beginning should return an empty list query_iterable = created_collection.query_items_change_feed( - partition_key_range_id=pkRangeId, - is_start_from_beginning=True + is_start_from_beginning=True, + **partitionParam ) iter_list = list(query_iterable) self.assertEqual(len(iter_list), 0) @@ -79,8 +87,8 @@ def test_query_change_feed(self): document_definition = {'pk': 'pk', 'id':'doc1'} created_collection.create_item(body=document_definition) query_iterable = created_collection.query_items_change_feed( - partition_key_range_id=pkRangeId, is_start_from_beginning=True, + **partitionParam ) iter_list = list(query_iterable) self.assertEqual(len(iter_list), 1) @@ -100,9 +108,9 @@ def test_query_change_feed(self): for pageSize in [1, 100]: # verify iterator query_iterable = created_collection.query_items_change_feed( - partition_key_range_id=pkRangeId, continuation=continuation2, - max_item_count=pageSize + max_item_count=pageSize, + **partitionParam ) it = query_iterable.__iter__() expected_ids = 'doc2.doc3.' @@ -114,9 +122,9 @@ def test_query_change_feed(self): # verify by_page # the options is not copied, therefore it need to be restored query_iterable = created_collection.query_items_change_feed( - partition_key_range_id=pkRangeId, continuation=continuation2, - max_item_count=pageSize + max_item_count=pageSize, + **partitionParam ) count = 0 expected_count = 2 @@ -134,8 +142,8 @@ def test_query_change_feed(self): # verify reading change feed from the beginning query_iterable = created_collection.query_items_change_feed( - partition_key_range_id=pkRangeId, - is_start_from_beginning=True + is_start_from_beginning=True, + **partitionParam ) expected_ids = ['doc1', 'doc2', 'doc3'] it = query_iterable.__iter__() @@ -147,9 +155,9 @@ def test_query_change_feed(self): # verify reading empty change feed query_iterable = created_collection.query_items_change_feed( - partition_key_range_id=pkRangeId, continuation=continuation3, - is_start_from_beginning=True + is_start_from_beginning=True, + **partitionParam ) iter_list = list(query_iterable) self.assertEqual(len(iter_list), 0)