Skip to content

Commit

Permalink
Eventhubs Python preview2 merge to master (#6660)
Browse files Browse the repository at this point in the history
* Eventhubs conn (#6394)

* Shared connection (sync) draft

* Shared connection (sync) draft 2

* Shared connection (sync) test update

* Shared connection

* Fix an issue

* add retry exponential delay and timeout to exception handling

* put module method before class def

* fixed Client.get_properties error

* Improve send timeout (#6481)

* Add timeout information to the link prop during link creation process (#6485)

* Update optional parameters in public api into kwargs and update comments (#6510)

* Update optional parameters in public api into kwargs and update some comments

* Update more optional parameters into kwargs paramter

* create_batch feature implementation (#6256) (#6324)

* create_batch feature implementation (#6256)

* Init create batch event

* create_batch implementation

* Revert _set_partition_key method and update comment

* Refacor EventDataBatch class

* Revert logic when setting partition_key of event_data

* Misc fixes from code review

* Rename max_message_size to max_size
Other small fixes

* Warn if event_data is None when call try_add

* Create batch event update (#6509)

* Update according to the review

* Update comment

* Revert some kwargs backto optional parameters as it may cause breaking changes

* Small fixes (#6520)

* Change back to normal number writings as not supported by python under 3.6

* small fix

* Add missing return (#6522)

* Fix livetest (#6523)

* Eventhubs new EventProcessor (#6550)

* Shared connection (sync) draft

* Shared connection (sync) draft 2

* Shared connection (sync) test update

* Shared connection

* Fix an issue

* add retry exponential delay and timeout to exception handling

* put module method before class def

* fixed Client.get_properties error

* new eph (draft)

* new eph (draft2)

* remove in memory partition manager

* EventProcessor draft 3

* small format change

* Fix logging

* Add EventProcessor example

* use decorator to implement retry logic and update some tests (#6544)

* Update livetest (#6547)

* Remove legacy code and update livetest (#6549)

* Update livetest

* Remove legacy code and update livetest

* make sync longrunning multi-threaded

* small changes on async long running test

* reset retry_count for iterator

* Don't return early when open a ReceiveClient or SendClient

* type annotation change

* Update kwargs and remove unused import

* Misc changes from EventProcessor PR review

* raise asyncio.CancelledError out instead of supressing it.

* Update livetest and small fixed (#6594)

* Add missing close in livetest

* Update livetest to wait longer

* Close handler each time before retry

* Fix feedback from PR (1)

* Revert "Merge branch 'eventhubs_dev' into eventhubs_eph"

This reverts commit 19a5539, reversing
changes made to 9d18dd9.

* Fix feedback from PR (2)

* Update code according to the review (#6623)

* Wait longer for reconnect op

* Raise authentication error when open timeout

* Optimize retry decorator

* Update code according to review

* Small fix

* Fix feedback from PR (3)

* small bug fixing

* Remove old EPH

*  Update decorator implementation (#6642)

* Update decorator implementation

* Remove old EPH pytest

* Revert "Revert "Merge branch 'eventhubs_dev' into eventhubs_eph""

This reverts commit d688090.

* Update sample codes and docstring (#6643)

* Check tablename to prevent sql injection

* PR review update

* Removed old EPH stuffs.
Added new EPH stuffs.

* Small fix (#6650)

* Draft for changelog

* Improve syntax for kwargs

* keep partition manager open for call restart() again

* Example to process async operations

* Update version to 5.0.0b2

* fix mypy problem

* fix small issue on max_retries

* compatible with python < 3.7

* Update docstring of event processor

* small changes on max_retries

* small changes on max_retries

* small changes

* new EventProcessor long-running live test

* change offset to text

* Updating docstings, docs, samples (#6673)

* Draft for updating documentations

* Small improvement

* Updating docstrings, docs and sample

* support 3.5 type hint

* fix 3.5 compatibility

* Update docs (#6678)

* Add a run_awhile example function

* small fix on example

* small fix on example

* Update documentations (#6694)

* Small update (#6696)
  • Loading branch information
YijunXieMS authored Aug 6, 2019
1 parent 53c84be commit 8de236a
Show file tree
Hide file tree
Showing 101 changed files with 2,243 additions and 17,410 deletions.
30 changes: 29 additions & 1 deletion sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
# Release History

## 5.0.0b2 (2019-08-06)

**New features**

- Added method `create_batch` on the `EventHubProducer` to create an `EventDataBatch` that can then be used to add events until the maximum size is reached.
- This batch object can then be used in the `send()` method to send all the added events to Event Hubs.
- This allows publishers to build batches without the possibility of encountering the error around the message size exceeding the supported limit when sending events.
- It also allows publishers with bandwidth concerns to control the size of each batch published.
- Added new configuration parameters for exponential delay between retry operations.
- `retry_total`: The total number of attempts to redo the failed operation.
- `backoff_factor`: The delay time factor.
- `backoff_max`: The maximum delay time in total.
- Added support for context manager on `EventHubClient`.
- Added new error type `OperationTimeoutError` for send operation.
- Introduced a new class `EventProcessor` which replaces the older concept of [Event Processor Host](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host). This early preview is intended to allow users to test the new design using a single instance of `EventProcessor`. The ability to checkpoints to a durable store will be added in future updates.
- `EventProcessor`: EventProcessor creates and runs consumers for all partitions of the eventhub.
- `PartitionManager`: PartitionManager defines the interface for getting/claiming ownerships of partitions and updating checkpoints.
- `PartitionProcessor`: PartitionProcessor defines the interface for processing events.
- `CheckpointManager`: CheckpointManager takes responsibility for updating checkpoints during events processing.

**Breaking changes**

- `EventProcessorHost` was replaced by `EventProcessor`, please read the new features for details.
- Replaced `max_retries` configuration parameter of the EventHubClient with `retry_total`.


## 5.0.0b1 (2019-06-25)

Version 5.0.0b1 is a preview of our efforts to create a client library that is user friendly and idiomatic to the Python ecosystem. The reasons for most of the changes in this update can be found in the [Azure SDK Design Guidelines for Python](https://azuresdkspecs.z5.web.core.windows.net/PythonSpec.html). For more information, please visit https://aka.ms/azure-sdk-preview1-python.
Expand Down Expand Up @@ -154,4 +180,6 @@ Version 5.0.0b1 is a preview of our efforts to create a client library that is u

## 0.2.0a1 (unreleased)

- Swapped out Proton dependency for uAMQP.
- Swapped out Proton dependency for uAMQP.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/HISTORY.png)
57 changes: 55 additions & 2 deletions sdk/eventhub/azure-eventhubs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The following sections provide several code snippets covering some of the most c
- [Consume events from an Event Hub](#consume-events-from-an-event-hub)
- [Async publish events to an Event Hub](#async-publish-events-to-an-event-hub)
- [Async consume events from an Event Hub](#async-consume-events-from-an-event-hub)
- [Consume events using an Event Processor](#consume-events-using-an-event-processor)

### Inspect an Event Hub

Expand Down Expand Up @@ -206,6 +207,56 @@ finally:
pass
```

### Consume events using an Event Processor

Using an `EventHubConsumer` to consume events like in the previous examples puts the responsibility of storing the checkpoints (the last processed event) on the user. Checkpoints are important for restarting the task of processing events from the right position in a partition. Ideally, you would also want to run multiple programs targeting different partitions with some load balancing. This is where an `EventProcessor` can help.

The `EventProcessor` will delegate the processing of events to a `PartitionProcessor` that you provide, allowing you to focus on business logic while the processor holds responsibility for managing the underlying consumer operations including checkpointing and load balancing.

While load balancing is a feature we will be adding in the next update, you can see how to use the `EventProcessor` in the below example, where we use an in memory `PartitionManager` that does checkpointing in memory.

```python
import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.eventprocessor import EventProcessor, PartitionProcessor, Sqlite3PartitionManager

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'

async def do_operation(event):
# do some sync or async operations. If the operation is i/o intensive, async will have better performance
print(event)

class MyPartitionProcessor(PartitionProcessor):
def __init__(self, checkpoint_manager):
super(MyPartitionProcessor, self).__init__(checkpoint_manager)

async def process_events(self, events):
if events:
await asyncio.gather(*[do_operation(event) for event in events])
await self._checkpoint_manager.update_checkpoint(events[-1].offset, events[-1].sequence_number)

def partition_processor_factory(checkpoint_manager):
return MyPartitionProcessor(checkpoint_manager)

async def main():
client = EventHubClient.from_connection_string(connection_str, receive_timeout=5, retry_total=3)
partition_manager = Sqlite3PartitionManager() # in-memory PartitionManager
try:
event_processor = EventProcessor(client, "$default", MyPartitionProcessor, partition_manager)
# You can also define a callable object for creating PartitionProcessor like below:
# event_processor = EventProcessor(client, "$default", partition_processor_factory, partition_manager)
asyncio.ensure_future(event_processor.start())
await asyncio.sleep(60)
await event_processor.stop()
finally:
await partition_manager.close()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
```

## Troubleshooting

### General
Expand All @@ -230,7 +281,7 @@ These are the samples in our repo demonstraing the usage of the library.
- [./examples/recv.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/recv.py) - use consumer to consume events
- [./examples/async_examples/send_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/send_async.py) - async/await support of a producer
- [./examples/async_examples/recv_async.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/async_examples/recv_async.py) - async/await support of a consumer
- [./examples/eph.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/eph.py) - event processor host
- [./examples/eventprocessor/event_processor_example.py](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhubs/examples/eventprocessor/event_processor_example.py) - event processor

### Documentation

Expand All @@ -253,4 +304,6 @@ This project welcomes contributions and suggestions. Most contributions require
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-python/sdk/eventhub/azure-eventhubs/README.png)
5 changes: 3 additions & 2 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "5.0.0b1"
__version__ = "5.0.0b2"

from azure.eventhub.common import EventData, EventPosition
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
AuthenticationError, EventDataSendError, ConnectionLostError
from azure.eventhub.client import EventHubClient
Expand All @@ -18,6 +18,7 @@

__all__ = [
"EventData",
"EventDataBatch",
"EventHubError",
"ConnectError",
"ConnectionLostError",
Expand Down
77 changes: 77 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/_connection_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from threading import RLock
from uamqp import Connection, TransportType, c_uamqp


class _SharedConnectionManager(object):
def __init__(self, **kwargs):
self._lock = RLock()
self._conn = None # type: Connection

self._container_id = kwargs.get("container_id")
self._debug = kwargs.get("debug")
self._error_policy = kwargs.get("error_policy")
self._properties = kwargs.get("properties")
self._encoding = kwargs.get("encoding") or "UTF-8"
self._transport_type = kwargs.get('transport_type') or TransportType.Amqp
self._http_proxy = kwargs.get('http_proxy')
self._max_frame_size = kwargs.get("max_frame_size")
self._channel_max = kwargs.get("channel_max")
self._idle_timeout = kwargs.get("idle_timeout")
self._remote_idle_timeout_empty_frame_send_ratio = kwargs.get("remote_idle_timeout_empty_frame_send_ratio")

def get_connection(self, host, auth):
# type: (...) -> Connection
with self._lock:
if self._conn is None:
self._conn = Connection(
host,
auth,
container_id=self._container_id,
max_frame_size=self._max_frame_size,
channel_max=self._channel_max,
idle_timeout=self._idle_timeout,
properties=self._properties,
remote_idle_timeout_empty_frame_send_ratio=self._remote_idle_timeout_empty_frame_send_ratio,
error_policy=self._error_policy,
debug=self._debug,
encoding=self._encoding)
return self._conn

def close_connection(self):
with self._lock:
if self._conn:
self._conn.destroy()
self._conn = None

def reset_connection_if_broken(self):
with self._lock:
if self._conn and self._conn._state in (
c_uamqp.ConnectionState.CLOSE_RCVD,
c_uamqp.ConnectionState.CLOSE_SENT,
c_uamqp.ConnectionState.DISCARDING,
c_uamqp.ConnectionState.END,
):
self._conn = None


class _SeparateConnectionManager(object):
def __init__(self, **kwargs):
pass

def get_connection(self, host, auth):
return None

def close_connection(self):
pass

def reset_connection_if_broken(self):
pass


def get_connection_manager(**kwargs):
return _SeparateConnectionManager(**kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals

import logging
import time

from uamqp import errors, constants, compat
from azure.eventhub.error import EventHubError, _handle_exception

log = logging.getLogger(__name__)


def _retry_decorator(to_be_wrapped_func):
def wrapped_func(self, *args, **kwargs):
timeout = kwargs.pop("timeout", 100000)
if not timeout:
timeout = 100000 # timeout equals to 0 means no timeout, set the value to be a large number.
timeout_time = time.time() + timeout
max_retries = self.client.config.max_retries
retry_count = 0
last_exception = None
while True:
try:
return to_be_wrapped_func(self, timeout_time=timeout_time, last_exception=last_exception, **kwargs)
except Exception as exception:
last_exception = self._handle_exception(exception, retry_count, max_retries, timeout_time)
retry_count += 1
return wrapped_func


class ConsumerProducerMixin(object):
def __init__(self):
self.client = None
self._handler = None
self.name = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close(exc_val)

def _check_closed(self):
if self.error:
raise EventHubError("{} has been closed. Please create a new one to handle event data.".format(self.name))

def _create_handler(self):
pass

def _redirect(self, redirect):
self.redirected = redirect
self.running = False
self._close_connection()

def _open(self, timeout_time=None):
"""
Open the EventHubConsumer using the supplied connection.
If the handler has previously been redirected, the redirect
context will be used to create a new handler before opening it.
"""
# pylint: disable=protected-access
if not self.running:
if self._handler:
self._handler.close()
if self.redirected:
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password": self.client._auth_config.get("iot_password")}
else:
alt_creds = {}
self._create_handler()
self._handler.open(connection=self.client._conn_manager.get_connection(
self.client.address.hostname,
self.client.get_auth(**alt_creds)
))
while not self._handler.client_ready():
time.sleep(0.05)
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \
or constants.MAX_MESSAGE_LENGTH_BYTES # pylint: disable=protected-access
self.running = True

def _close_handler(self):
self._handler.close() # close the link (sharing connection) or connection (not sharing)
self.running = False

def _close_connection(self):
self._close_handler()
self.client._conn_manager.reset_connection_if_broken()

def _handle_exception(self, exception, retry_count, max_retries, timeout_time):
if not self.running and isinstance(exception, compat.TimeoutException):
exception = errors.AuthenticationException("Authorization timeout.")
return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

return _handle_exception(exception, retry_count, max_retries, self, timeout_time)

def close(self, exception=None):
# type:(Exception) -> None
"""
Close down the handler. If the handler has already closed,
this will be a no op. An optional exception can be passed in to
indicate that the handler was shutdown due to error.
:param exception: An optional exception if the handler is closing
due to an error.
:type exception: Exception
Example:
.. literalinclude:: ../examples/test_examples_eventhub.py
:start-after: [START eventhub_client_receiver_close]
:end-before: [END eventhub_client_receiver_close]
:language: python
:dedent: 4
:caption: Close down the handler.
"""
self.running = False
if self.error:
return
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
elif exception:
self.error = EventHubError(str(exception))
else:
self.error = EventHubError("{} handler is closed.".format(self.name))
if self._handler:
self._handler.close() # this will close link if sharing connection. Otherwise close connection
Loading

0 comments on commit 8de236a

Please sign in to comment.