Skip to content

Commit

Permalink
Storage operations within checkpoint manager did not utilize the blob…
Browse files Browse the repository at this point in the history
…_prefix. (#9086)

* Storage operations within checkpoint manager did not utilize the blob_prefix.
* Adds unit tests to validate this as well.
* Add new parameter, use_consumer_group_as_directory, to control consumer_group pathing in concert with storage_blob_prefix.
* Add docstring improvements for these lease path parameters.
* Add unit tests for new parameters
* Add release notes
  • Loading branch information
KieranBrantnerMagee authored Jan 27, 2020
1 parent 5868e83 commit 99b35c0
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 17 deletions.
14 changes: 13 additions & 1 deletion sdk/eventhub/azure-eventhubs/HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,20 @@
Release History
===============

1.3.4 (TBD)
-----------

**Features**

- Add new parameter to AzureStorageCheckpointLeaseManager, use_consumer_group_as_directory, to control consumer_group pathing in concert with storage_blob_prefix.

**BugFixes**

- Ensures storage_blob_prefix within AzureStorageCheckpointLeaseManager is actually applied.


1.3.3 (2019-12-4)
------------------
-----------------

**Features**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class AzureStorageCheckpointLeaseManager(AbstractCheckpointManager, AbstractLeas
will be used.
:param str lease_container_name: The name of the container that will be used to store
leases. If it does not already exist it will be created. Default value is 'eph-leases'.
Leases are named via internal partition_ids, locations can be modified via
storage_blob_prefix and use_consumer_group_as_directory.
:param str storage_blob_prefix: If populated, prepends a prefix when constructing
the location that leases are stored within the lease_container. Default None.
If consumer_group_as_directory is also provided, it is unified as such <prefix><group>/<id>.
:param int lease_renew_interval: The interval in seconds at which EPH will attempt to
renew the lease of a particular partition. Default value is 10.
:param int lease_duration: The duration in seconds of a lease on a partition.
Expand All @@ -49,11 +54,16 @@ class AzureStorageCheckpointLeaseManager(AbstractCheckpointManager, AbstractLeas
:param str connection_string: If specified, this will override all other endpoint parameters.
See http://azure.microsoft.com/en-us/documentation/articles/storage-configure-connection-string/
for the connection string format.
:param bool use_consumer_group_as_directory: If true, includes the consumer group as part of the
location we use to store leases within the container, as such: <consumer_group>/<partition_id>,
otherwise leases are simply named by their partition_id. Default False.
If storage_blob_prefix is provided this prefix will be prepended in either case.
"""

def __init__(self, storage_account_name=None, storage_account_key=None, lease_container_name="eph-leases",
storage_blob_prefix=None, lease_renew_interval=10, lease_duration=30,
sas_token=None, endpoint_suffix="core.windows.net", connection_string=None):
sas_token=None, endpoint_suffix="core.windows.net", connection_string=None,
use_consumer_group_as_directory=False):
AbstractCheckpointManager.__init__(self)
AbstractLeaseManager.__init__(self, lease_renew_interval, lease_duration)
self.storage_account_name = storage_account_name
Expand All @@ -63,6 +73,7 @@ def __init__(self, storage_account_name=None, storage_account_key=None, lease_co
self.connection_string = connection_string
self.lease_container_name = lease_container_name
self.storage_blob_prefix = storage_blob_prefix
self.use_consumer_group_as_directory = use_consumer_group_as_directory
self.storage_client = None
self.consumer_group_directory = None
self.host = None
Expand Down Expand Up @@ -97,7 +108,7 @@ def initialize(self, host):
endpoint_suffix=self.endpoint_suffix,
connection_string=self.connection_string,
request_session=self.request_session)
self.consumer_group_directory = self.storage_blob_prefix + self.host.eh_config.consumer_group
self.consumer_group_directory = self.host.eh_config.consumer_group if self.use_consumer_group_as_directory else ""

# Checkpoint Managment Methods

Expand Down Expand Up @@ -213,11 +224,13 @@ async def get_lease_async(self, partition_id):
:rtype: ~azure.eventprocessorhost.lease.Lease
"""
try:
blob_path = self._get_lease_blob_path(partition_id)
blob = await self.host.loop.run_in_executor(
self.executor,
functools.partial(
self.storage_client.get_blob_to_text,
self.lease_container_name, partition_id))
self.lease_container_name,
blob_path))
lease = AzureBlobLease()
lease.with_blob(blob)
async def state():
Expand All @@ -231,7 +244,7 @@ async def state():
functools.partial(
self.storage_client.get_blob_properties,
self.lease_container_name,
partition_id))
blob_path))
return res.properties.lease.state
except Exception as err: # pylint: disable=broad-except
_logger.error("Failed to get lease state %r %r", err, partition_id)
Expand Down Expand Up @@ -269,20 +282,21 @@ async def create_lease_if_not_exists_async(self, partition_id):
"""
return_lease = None
try:
blob_path = self._get_lease_blob_path(partition_id)
return_lease = AzureBlobLease()
return_lease.partition_id = partition_id
serializable_lease = return_lease.serializable()
json_lease = json.dumps(serializable_lease)
_logger.info("Creating Lease %r %r %r",
self.lease_container_name,
partition_id,
blob_path,
json.dumps({k:v for k, v in serializable_lease.items() if k != 'event_processor_context'}))
await self.host.loop.run_in_executor(
self.executor,
functools.partial(
self.storage_client.create_blob_from_text,
self.lease_container_name,
partition_id,
blob_path,
json_lease))
except Exception: # pylint: disable=broad-except
try:
Expand All @@ -300,12 +314,13 @@ async def delete_lease_async(self, lease):
:param lease: The stored lease to be deleted.
:type lease: ~azure.eventprocessorhost.lease.Lease
"""
blob_path = self._get_lease_blob_path(lease.partition_id)
await self.host.loop.run_in_executor(
self.executor,
functools.partial(
self.storage_client.delete_blob,
self.lease_container_name,
lease.partition_id,
blob_path,
lease_id=lease.token))

async def acquire_lease_async(self, lease):
Expand All @@ -323,6 +338,7 @@ async def acquire_lease_async(self, lease):
new_lease_id = str(uuid.uuid4())
partition_id = lease.partition_id
try:
blob_path = self._get_lease_blob_path(partition_id)
if asyncio.iscoroutinefunction(lease.state):
state = await lease.state()
else:
Expand All @@ -345,7 +361,7 @@ async def acquire_lease_async(self, lease):
functools.partial(
self.storage_client.change_blob_lease,
self.lease_container_name,
partition_id,
blob_path,
lease.token,
new_lease_id))
lease.token = new_lease_id
Expand All @@ -356,7 +372,7 @@ async def acquire_lease_async(self, lease):
functools.partial(
self.storage_client.acquire_blob_lease,
self.lease_container_name,
partition_id,
blob_path,
self.lease_duration,
new_lease_id))
lease.owner = self.host.host_name
Expand All @@ -381,12 +397,13 @@ async def renew_lease_async(self, lease):
:rtype: bool
"""
try:
blob_path = self._get_lease_blob_path(lease.partition_id)
await self.host.loop.run_in_executor(
self.executor,
functools.partial(
self.storage_client.renew_blob_lease,
self.lease_container_name,
lease.partition_id,
blob_path,
lease_id=lease.token,
timeout=self.lease_duration))
except Exception as err: # pylint: disable=broad-except
Expand All @@ -411,6 +428,7 @@ async def release_lease_async(self, lease):
lease_id = None
try:
_logger.info("Releasing lease %r %r", self.host.guid, lease.partition_id)
blob_path = self._get_lease_blob_path(lease.partition_id)
lease_id = lease.token
released_copy = AzureBlobLease()
released_copy.with_lease(lease)
Expand All @@ -422,15 +440,15 @@ async def release_lease_async(self, lease):
functools.partial(
self.storage_client.create_blob_from_text,
self.lease_container_name,
lease.partition_id,
blob_path,
json.dumps(released_copy.serializable()),
lease_id=lease_id))
await self.host.loop.run_in_executor(
self.executor,
functools.partial(
self.storage_client.release_blob_lease,
self.lease_container_name,
lease.partition_id,
blob_path,
lease_id))
except Exception as err: # pylint: disable=broad-except
_logger.error("Failed to release lease %r %r %r",
Expand Down Expand Up @@ -461,12 +479,13 @@ async def update_lease_async(self, lease):
# First, renew the lease to make sure the update will go through.
if await self.renew_lease_async(lease):
try:
blob_path = self._get_lease_blob_path(lease.partition_id)
await self.host.loop.run_in_executor(
self.executor,
functools.partial(
self.storage_client.create_blob_from_text,
self.lease_container_name,
lease.partition_id,
blob_path,
json.dumps(lease.serializable()),
lease_id=lease.token))

Expand All @@ -477,3 +496,18 @@ async def update_lease_async(self, lease):
else:
return False
return True

def _get_lease_blob_path(self, partition_id):
# Note: In a perfect world, without a prefix provided we'd prepend the
# consumer group to the partition_id. However this would break
# backcompat with a historical world in which we just had partition_id
# within the container, and preclude any way for a user to generate
# that behavior, so we will fix it in all cases in the Track2 library
# and simply enable the proper full path here if the optional parameter
# is present.
path = partition_id
if self.consumer_group_directory:
path = str.format("{}/{}", self.consumer_group_directory, partition_id)
if self.storage_blob_prefix:
path = "{}{}".format(self.storage_blob_prefix, path)
return path
45 changes: 42 additions & 3 deletions sdk/eventhub/azure-eventhubs/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,55 @@ def connstr_senders(connection_str):
client.stop()


@pytest.fixture()
def storage_clm(eph):
def _storage_clm(eph, use_storage_blob_prefix=False, use_consumer_group_as_directory=False):
try:
container = str(uuid.uuid4())
storage_clm = AzureStorageCheckpointLeaseManager(
os.environ['AZURE_STORAGE_ACCOUNT'],
os.environ['AZURE_STORAGE_ACCESS_KEY'],
container)
container,
storage_blob_prefix="testprefix" + str(uuid.uuid4()) if use_storage_blob_prefix else None,
use_consumer_group_as_directory=use_consumer_group_as_directory)
except KeyError:
pytest.skip("Live Storage configuration not found.")
return (storage_clm, container)


@pytest.fixture()
def storage_clm_with_prefix(eph):
storage_clm, container = _storage_clm(eph, True)
try:
storage_clm.initialize(eph)
storage_clm.storage_client.create_container(container)
yield storage_clm
finally:
storage_clm.storage_client.delete_container(container)

@pytest.fixture()
def storage_clm_with_prefix_and_consumer_dir(eph):
storage_clm, container = _storage_clm(eph, True, True)
try:
storage_clm.initialize(eph)
storage_clm.storage_client.create_container(container)
yield storage_clm
finally:
storage_clm.storage_client.delete_container(container)


@pytest.fixture()
def storage_clm_with_consumer_dir(eph):
storage_clm, container = _storage_clm(eph, False, True)
try:
storage_clm.initialize(eph)
storage_clm.storage_client.create_container(container)
yield storage_clm
finally:
storage_clm.storage_client.delete_container(container)


@pytest.fixture()
def storage_clm(eph):
storage_clm, container = _storage_clm(eph)
try:
storage_clm.initialize(eph)
storage_clm.storage_client.create_container(container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,70 @@ def test_delete_lease(storage_clm):
assert lease == None


@pytest.mark.liveTest
def test_lease_with_path_prefix(storage_clm_with_prefix):
"""
Test creating a lease with a blob prefix
"""
loop = asyncio.get_event_loop()
local_checkpoint = loop.run_until_complete(storage_clm_with_prefix.create_checkpoint_if_not_exists_async("1"))
assert local_checkpoint.partition_id == "1"
assert local_checkpoint.offset == "-1"
lease = loop.run_until_complete(storage_clm_with_prefix.get_lease_async("1"))

path_parts = storage_clm_with_prefix._get_lease_blob_path("0").split('/')
assert "testprefix" in path_parts[0]
assert "$default" not in path_parts[0]
assert len(path_parts) == 1
assert path_parts[-1][-1] == "0"


@pytest.mark.liveTest
def test_lease_with_path_prefix_and_consumer_dir(storage_clm_with_prefix_and_consumer_dir):
"""
Test creating a lease with a blob prefix
"""
loop = asyncio.get_event_loop()
local_checkpoint = loop.run_until_complete(storage_clm_with_prefix_and_consumer_dir.create_checkpoint_if_not_exists_async("1"))
assert local_checkpoint.partition_id == "1"
assert local_checkpoint.offset == "-1"
lease = loop.run_until_complete(storage_clm_with_prefix_and_consumer_dir.get_lease_async("1"))

path_parts = storage_clm_with_prefix_and_consumer_dir._get_lease_blob_path("0").split('/')
assert "testprefix" in path_parts[0]
assert "$default" in path_parts[0]
assert len(path_parts) == 2
assert path_parts[-1] == "0"


@pytest.mark.liveTest
def test_lease_with_consumer_dir(storage_clm_with_consumer_dir):
"""
Test creating a lease with a blob prefix
"""
loop = asyncio.get_event_loop()
local_checkpoint = loop.run_until_complete(storage_clm_with_consumer_dir.create_checkpoint_if_not_exists_async("1"))
assert local_checkpoint.partition_id == "1"
assert local_checkpoint.offset == "-1"
lease = loop.run_until_complete(storage_clm_with_consumer_dir.get_lease_async("1"))

path_parts = storage_clm_with_consumer_dir._get_lease_blob_path("0").split('/')
assert "testprefix" not in path_parts[0]
assert "$default" in path_parts[0]
assert len(path_parts) == 2
assert path_parts[-1] == "0"


@pytest.mark.liveTest
def test_lease_without_path_prefix(storage_clm):
"""
Test creating a lease with a blob prefix
"""
path_parts = storage_clm._get_lease_blob_path("0").split('/')
assert len(path_parts) == 1
assert path_parts[0] == "0"


@pytest.mark.liveTest
def test_checkpointing(storage_clm):
"""
Expand Down

0 comments on commit 99b35c0

Please sign in to comment.