Skip to content

Commit

Permalink
[Event Hubs] EventHubProducerClient.send_batch accepts a list of Ev…
Browse files Browse the repository at this point in the history
…entData (Azure#11079)
  • Loading branch information
YijunXieMS authored Apr 30, 2020
1 parent 67b483f commit 96ec2d7
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 21 deletions.
3 changes: 3 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 5.1.0b2 (Unreleased)

**New Features**

- `EventHubProducerClient.send_batch` accepts either an `EventDataBatch` or a finite list of `EventData`. #9181

## 5.1.0b1 (2020-04-06)

Expand Down
9 changes: 9 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,15 @@ def _from_batch(cls, batch_data, partition_key=None):
)
return batch_data_instance

def _load_events(self, events):
for event_data in events:
try:
self.add(event_data)
except ValueError:
raise ValueError("The combined size of EventData collection exceeds the Event Hub frame size limit. "
"Please send a smaller collection of EventData, or use EventDataBatch, "
"which is guaranteed to be under the frame size limit")

@property
def size_in_bytes(self):
# type: () -> int
Expand Down
42 changes: 35 additions & 7 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import threading

from typing import Any, Union, TYPE_CHECKING, Dict, List, Optional, cast

from uamqp import constants

from .exceptions import ConnectError, EventHubError
from ._client_base import ClientBase
from ._producer import EventHubProducer
from ._constants import ALL_PARTITIONS
from ._common import EventDataBatch
from ._common import EventDataBatch, EventData

if TYPE_CHECKING:
from azure.core.credentials import TokenCredential
Expand Down Expand Up @@ -183,20 +184,37 @@ def from_connection_string(cls, conn_str, **kwargs):
return cls(**constructor_args)

def send_batch(self, event_data_batch, **kwargs):
# type: (EventDataBatch, Any) -> None
# type: (Union[EventDataBatch, List[EventData]], Any) -> None
"""Sends event data and blocks until acknowledgement is received or operation times out.
:param event_data_batch: The EventDataBatch object to be sent.
:type event_data_batch: ~azure.eventhub.EventDataBatch
If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub
frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch`
to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit,
and then call this method to send out the batch.
:param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent
in a batch.
:type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]]
:keyword float timeout: The maximum wait time to send the event data.
If not specified, the default wait time specified when the producer was created will be used.
:keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_id.
:keyword str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_key.
If both partition_id and partition_key is provided, the partition_id will take precedence.
:rtype: None
:raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>`
:class:`ConnectError<azure.eventhub.exceptions.ConnectError>`
:class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>`
:class:`EventDataError<azure.eventhub.exceptions.EventDataError>`
:class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>`
:class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
:class:`ValueError`
:class:`TypeError`
.. admonition:: Example:
Expand All @@ -208,18 +226,28 @@ def send_batch(self, event_data_batch, **kwargs):
:caption: Sends event data
"""
partition_id = kwargs.get("partition_id")
partition_key = kwargs.get("partition_key")
if isinstance(event_data_batch, EventDataBatch):
if partition_id or partition_key:
raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key")
to_send_batch = event_data_batch
else:
to_send_batch = self.create_batch(partition_id=partition_id, partition_key=partition_key)
to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access
partition_id = (
event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
)
send_timeout = kwargs.pop("timeout", None)
try:
cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=send_timeout
to_send_batch, timeout=send_timeout
)
except (KeyError, AttributeError, EventHubError):
self._start_producer(partition_id, send_timeout)
cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=send_timeout
to_send_batch, timeout=send_timeout
)

def create_batch(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ._client_base_async import ClientBaseAsync
from ._producer_async import EventHubProducer
from .._constants import ALL_PARTITIONS
from .._common import EventDataBatch
from .._common import EventDataBatch, EventData

if TYPE_CHECKING:
from uamqp.constants import TransportType
Expand Down Expand Up @@ -211,23 +211,41 @@ def from_connection_string(

async def send_batch(
self,
event_data_batch: EventDataBatch,
event_data_batch: Union[EventDataBatch, List[EventData]],
*,
timeout: Optional[Union[int, float]] = None
timeout: Optional[Union[int, float]] = None,
**kwargs
) -> None:
"""Sends event data and blocks until acknowledgement is received or operation times out.
:param event_data_batch: The EventDataBatch object to be sent.
:type event_data_batch: ~azure.eventhub.EventDataBatch
:param float timeout: The maximum wait time to send the event data.
If you're sending a finite list of `EventData` and you know it's within the size limit of the event hub
frame size limit, you can send them with a `send_batch` call. Otherwise, use :meth:`create_batch`
to create `EventDataBatch` and add `EventData` into the batch one by one until the size limit,
and then call this method to send out the batch.
:param event_data_batch: The `EventDataBatch` object to be sent or a list of `EventData` to be sent
in a batch.
:type event_data_batch: Union[~azure.eventhub.EventDataBatch, List[~azure.eventhub.EventData]]
:keyword float timeout: The maximum wait time to send the event data.
If not specified, the default wait time specified when the producer was created will be used.
:keyword str partition_id: The specific partition ID to send to. Default is None, in which case the service
will assign to all partitions using round-robin.
A `TypeError` will be raised if partition_id is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_id.
:keyword str partition_key: With the given partition_key, event data will be sent to
a particular partition of the Event Hub decided by the service.
A `TypeError` will be raised if partition_key is specified and event_data_batch is an `EventDataBatch` because
`EventDataBatch` itself has partition_key.
If both partition_id and partition_key is provided, the partition_id will take precedence.
:rtype: None
:raises: :class:`AuthenticationError<azure.eventhub.exceptions.AuthenticationError>`
:class:`ConnectError<azure.eventhub.exceptions.ConnectError>`
:class:`ConnectionLostError<azure.eventhub.exceptions.ConnectionLostError>`
:class:`EventDataError<azure.eventhub.exceptions.EventDataError>`
:class:`EventDataSendError<azure.eventhub.exceptions.EventDataSendError>`
:class:`EventHubError<azure.eventhub.exceptions.EventHubError>`
:class:`ValueError`
:class:`TypeError`
.. admonition:: Example:
Expand All @@ -239,17 +257,28 @@ async def send_batch(
:caption: Asynchronously sends event data
"""
partition_id = kwargs.get("partition_id")
partition_key = kwargs.get("partition_key")
if isinstance(event_data_batch, EventDataBatch):
if partition_id or partition_key:
raise TypeError("partition_id and partition_key should be None when sending an EventDataBatch "
"because type EventDataBatch itself may have partition_id or partition_key")
to_send_batch = event_data_batch
else:
to_send_batch = await self.create_batch(partition_id=partition_id, partition_key=partition_key)
to_send_batch._load_events(event_data_batch) # pylint:disable=protected-access

partition_id = (
event_data_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
to_send_batch._partition_id or ALL_PARTITIONS # pylint:disable=protected-access
)
try:
await cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=timeout
to_send_batch, timeout=timeout
)
except (KeyError, AttributeError, EventHubError):
await self._start_producer(partition_id, timeout)
await cast(EventHubProducer, self._producers[partition_id]).send(
event_data_batch, timeout=timeout
to_send_batch, timeout=timeout
)

async def create_batch(
Expand Down
18 changes: 18 additions & 0 deletions sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import os

from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData

CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
Expand Down Expand Up @@ -70,6 +71,22 @@ async def send_event_data_batch_with_properties(producer):
await producer.send_batch(event_data_batch)


async def send_event_data_list(producer):
# If you know beforehand that the list of events you have will not exceed the
# size limits, you can use the `send_batch()` api directly without creating an EventDataBatch

# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.

event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
await producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)


async def run():

producer = EventHubProducerClient.from_connection_string(
Expand All @@ -82,6 +99,7 @@ async def run():
await send_event_data_batch_with_partition_key(producer)
await send_event_data_batch_with_partition_id(producer)
await send_event_data_batch_with_properties(producer)
await send_event_data_list(producer)


loop = asyncio.get_event_loop()
Expand Down
19 changes: 18 additions & 1 deletion sdk/eventhub/azure-eventhub/samples/sync_samples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import time
import os
from azure.eventhub import EventHubProducerClient, EventData

from azure.eventhub.exceptions import EventHubError

CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
Expand Down Expand Up @@ -68,6 +68,22 @@ def send_event_data_batch_with_properties(producer):
producer.send_batch(event_data_batch)


def send_event_data_list(producer):
# If you know beforehand that the list of events you have will not exceed the
# size limits, you can use the `send_batch()` api directly without creating an EventDataBatch

# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.

event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)


producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
Expand All @@ -80,5 +96,6 @@ def send_event_data_batch_with_properties(producer):
send_event_data_batch_with_partition_key(producer)
send_event_data_batch_with_partition_id(producer)
send_event_data_batch_with_properties(producer)
send_event_data_list(producer)

print("Send messages in {} seconds.".format(time.time() - start_time))
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
import time
import json

from azure.eventhub import EventData, TransportType
from azure.eventhub import EventData, TransportType, EventDataBatch
from azure.eventhub.aio import EventHubProducerClient

from azure.eventhub.exceptions import EventDataSendError

@pytest.mark.liveTest
@pytest.mark.asyncio
Expand Down Expand Up @@ -169,3 +169,57 @@ async def test_send_with_create_event_batch_async(connstr_receivers):
received.extend(r.receive_message_batch(timeout=10000))
assert len(received) >= 1
assert EventData._from_message(received[0]).properties[b"raw_prop"] == b"raw_value"



@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_list_async(connstr_receivers):
connection_str, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(connection_str)
payload = "A1"
async with client:
await client.send_batch([EventData(payload)])
received = []
for r in receivers:
received.extend([EventData._from_message(x) for x in r.receive_message_batch(timeout=10000)])

assert len(received) == 1
assert received[0].body_as_str() == payload


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_list_partition_async(connstr_receivers):
connection_str, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(connection_str)
payload = "A1"
async with client:
await client.send_batch([EventData(payload)], partition_id="0")
message = receivers[0].receive_message_batch(timeout=10000)[0]
received = EventData._from_message(message)
assert received.body_as_str() == payload


@pytest.mark.parametrize("to_send, exception_type",
[([], EventDataSendError),
([EventData("A"*1024)]*1100, ValueError),
("any str", AttributeError)
])
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_list_wrong_data_async(connection_str, to_send, exception_type):
client = EventHubProducerClient.from_connection_string(connection_str)
async with client:
with pytest.raises(exception_type):
await client.send_batch(to_send)


@pytest.mark.parametrize("partition_id, partition_key", [("0", None), (None, "pk")])
async def test_send_batch_pid_pk_async(invalid_hostname, partition_id, partition_key):
# Use invalid_hostname because this is not a live test.
client = EventHubProducerClient.from_connection_string(invalid_hostname)
batch = EventDataBatch(partition_id=partition_id, partition_key=partition_key)
async with client:
with pytest.raises(TypeError):
await client.send_batch(batch, partition_id=partition_id, partition_key=partition_key)
Loading

0 comments on commit 96ec2d7

Please sign in to comment.