diff --git a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py index 09c46db4feb..524bc75cdfd 100644 --- a/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py +++ b/sdk/eventhub/azure-eventhubs-checkpointstoreblob-aio/tests/test_storage_blob_partition_manager.py @@ -9,6 +9,7 @@ import os import uuid import warnings +import asyncio from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager @@ -41,90 +42,99 @@ def remove_live_storage_blob_client(container_str): warnings.warn(UserWarning("storage container teardown failed")) -@pytest.mark.liveTest -@pytest.mark.asyncio -async def test_claim_and_list_ownership(): - container_str, live_storage_blob_client = get_live_storage_blob_client() - if not live_storage_blob_client: - pytest.skip("Storage blob client can't be created") - +async def _claim_and_list_ownership(live_storage_blob_client): eventhub_name = 'eventhub' consumer_group_name = '$default' ownership_cnt = 8 + async with live_storage_blob_client: + partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) - try: - async with live_storage_blob_client: - - partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) + assert len(ownership_list) == 0 - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) - assert len(ownership_list) == 0 + ownership_list = [] - ownership_list = [] + for i in range(ownership_cnt): + ownership = {} + ownership['eventhub_name'] = eventhub_name + ownership['consumer_group_name'] = consumer_group_name + ownership['owner_id'] = 'ownerid' + ownership['partition_id'] = str(i) + ownership['last_modified_time'] = time.time() + ownership["offset"] = "1" + ownership["sequence_number"] = "1" + ownership_list.append(ownership) - for i in range(ownership_cnt): - ownership = {} - ownership['eventhub_name'] = eventhub_name - ownership['consumer_group_name'] = consumer_group_name - ownership['owner_id'] = 'ownerid' - ownership['partition_id'] = str(i) - ownership['last_modified_time'] = time.time() - ownership["offset"] = "1" - ownership["sequence_number"] = "1" - ownership_list.append(ownership) + await partition_manager.claim_ownership(ownership_list) - await partition_manager.claim_ownership(ownership_list) - - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) - assert len(ownership_list) == ownership_cnt - finally: - remove_live_storage_blob_client(container_str) + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) + assert len(ownership_list) == ownership_cnt @pytest.mark.liveTest -@pytest.mark.asyncio -async def test_update_checkpoint(): +def test_claim_and_list_ownership(): container_str, live_storage_blob_client = get_live_storage_blob_client() if not live_storage_blob_client: pytest.skip("Storage blob client can't be created") + try: + loop = asyncio.get_event_loop() + loop.run_until_complete(_claim_and_list_ownership(live_storage_blob_client)) + finally: + remove_live_storage_blob_client(container_str) + +async def _update_checkpoint(live_storage_blob_client): eventhub_name = 'eventhub' consumer_group_name = '$default' owner_id = 'owner' partition_cnt = 8 + async with live_storage_blob_client: + partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) + assert len(ownership_list) == 0 + + ownership_list = [] + + for i in range(partition_cnt): + ownership = {} + ownership['eventhub_name'] = eventhub_name + ownership['consumer_group_name'] = consumer_group_name + ownership['owner_id'] = owner_id + ownership['partition_id'] = str(i) + ownership['last_modified_time'] = time.time() + ownership['offset'] = '1' + ownership['sequence_number'] = '10' + ownership_list.append(ownership) + + await partition_manager.claim_ownership(ownership_list) + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) + assert len(ownership_list) == partition_cnt + + for i in range(partition_cnt): + await partition_manager.update_checkpoint(eventhub_name, consumer_group_name, str(i), + owner_id, '2', '20') + + ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, + consumer_group_name=consumer_group_name) + for ownership in ownership_list: + assert ownership['offset'] == '2' + assert ownership['sequence_number'] == '20' + + +@pytest.mark.liveTest +def test_update_checkpoint(): + container_str, live_storage_blob_client = get_live_storage_blob_client() + if not live_storage_blob_client: + pytest.skip("Storage blob client can't be created") try: - async with live_storage_blob_client: - partition_manager = BlobPartitionManager(container_client=live_storage_blob_client) - - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) - assert len(ownership_list) == 0 - - ownership_list = [] - - for i in range(partition_cnt): - ownership = {} - ownership['eventhub_name'] = eventhub_name - ownership['consumer_group_name'] = consumer_group_name - ownership['owner_id'] = owner_id - ownership['partition_id'] = str(i) - ownership['last_modified_time'] = time.time() - ownership['offset'] = '1' - ownership['sequence_number'] = '10' - ownership_list.append(ownership) - - await partition_manager.claim_ownership(ownership_list) - - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) - assert len(ownership_list) == partition_cnt - - for i in range(partition_cnt): - await partition_manager.update_checkpoint(eventhub_name, consumer_group_name, str(i), - owner_id, '2', '20') - - ownership_list = await partition_manager.list_ownership(eventhub_name=eventhub_name, consumer_group_name=consumer_group_name) - for ownership in ownership_list: - assert ownership['offset'] == '2' - assert ownership['sequence_number'] == '20' + loop = asyncio.get_event_loop() + loop.run_until_complete(_update_checkpoint(live_storage_blob_client)) finally: remove_live_storage_blob_client(container_str)