-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
create_batch feature implementation (#6256) #6324
Conversation
* Init create batch event * create_batch implementation * Revert _set_partition_key method and update comment * Refacor EventDataBatch class * Revert logic when setting partition_key of event_data
Can one of the admins verify this patch? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a couple of comments to the async clients - the same clients apply to the sync ones but I haven't duplicated them ;)
@@ -110,6 +111,10 @@ def __init__( # pylint: disable=super-init-not-called | |||
await self._connect() | |||
self.running = True | |||
|
|||
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ | |||
self._handler.message_handler._link.peer_max_message_size\ | |||
else constants.MAX_MESSAGE_LENGTH_BYTES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: This statement is a bit of mouthful - I think it would be cleaner just to do it with an "or" statement:
self._max_message_size = self._handler.message_handler._link.peer_max_message_size or constants.MAX_MESSAGE_LENGTH
Also I think it will need a pylint disable for protected-access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion. Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You fixed it in the sync class but not the async one :)
await self._open() | ||
|
||
if max_message_size and max_message_size > self._max_message_size_on_link: | ||
raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should just be a ValueError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to ValueError
raise EventDataError('Max message size: {} is too large, acceptable max batch size is: {} bytes.' | ||
.format(max_message_size, self._max_message_size_on_link)) | ||
|
||
return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be done with an "or" statement:
EventDataBatch(max_message_size or self._max_message_size....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to use "or"
.format(max_message_size, self._max_message_size_on_link)) | ||
|
||
return EventDataBatch(max_message_size if max_message_size else self._max_message_size_on_link, partition_key) | ||
|
||
async def send(self, event_data, partition_key=None): | ||
# type:(Union[EventData, Union[List[EventData], Iterator[EventData], Generator[EventData]]], Union[str, bytes]) -> None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type hint needs updating to include the EventDataBatch (assuming it can be passed in here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to
type:(Union[EventData, EventDataBatch, Iterable[EventData]], Union[str, bytes]) -> None
event_data_with_pk, | ||
partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) | ||
if isinstance(event_data, EventDataBatch): | ||
wrapper_event_data = event_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that in the case of EventDataBatch we ignore the passed in partition key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, EventDataBatch wins. It also has partition_key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case we should raise an error if a user tries to pass in the partition_key alongside a Batch object. Otherwise they might think the value they pass in being used when it's actually being ignored.
self._set_partition_key(partition_key) | ||
self._size = self.message.gather()[0].get_message_encoded_size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please double check the uAMQP code for .gather()
? I have a suspicion that it's not a repeatable operation.... though I could be wrong....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It works in our case
@@ -260,6 +279,35 @@ def _set_partition_key(self, value): | |||
self.message.annotations = annotations | |||
self.message.header = header | |||
|
|||
def try_add(self, event_data): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this function used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try_add function is part of the API and supposed to be used by customers not by us.
Improperly routed to batch. Concerns batching of event hub events. @annatisch if you wanna review. |
@@ -110,6 +111,10 @@ def __init__( # pylint: disable=super-init-not-called | |||
await self._connect() | |||
self.running = True | |||
|
|||
self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size if\ | |||
self._handler.message_handler._link.peer_max_message_size\ | |||
else constants.MAX_MESSAGE_LENGTH_BYTES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You fixed it in the sync class but not the async one :)
@@ -301,8 +306,25 @@ def _set_partition_key(event_datas, partition_key): | |||
ed._set_partition_key(partition_key) | |||
yield ed | |||
|
|||
async def create_batch(self, max_size=None, partition_key=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this function needs to be async.
I think it might be better to raise an error if this is called before the link is open, rather than have it open the connection.
Then the function doesn't need to be async and can simply return the Batch object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we're not exposing the producer.open api to user, if users want to use create_batch before they send, we need to open the connection for them internally to get the right peer link message size.
event_data_with_pk, | ||
partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) | ||
if isinstance(event_data, EventDataBatch): | ||
wrapper_event_data = event_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case we should raise an error if a user tries to pass in the partition_key alongside a Batch object. Otherwise they might think the value they pass in being used when it's actually being ignored.
:return: | ||
""" | ||
if not self._max_message_size_on_link: | ||
self._open() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change the behaviour for the async create_batch (to raise an error if the connection isn't open) we will need to do the same thing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same to the above async one. We should open internally first to get the right peer link message size.
@@ -298,16 +302,34 @@ def _error(outcome, condition): | |||
if outcome != constants.MessageSendResult.Ok: | |||
raise condition | |||
|
|||
def create_batch(self, max_size=None, partition_key=None): | |||
""" | |||
Create an EventDataBatch object with max message size being max_message_size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter is called max_size
not max_message_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parameter name and description updated.
Create an EventDataBatch object with max message size being max_message_size. | ||
The max_message_size should be no greater than the max allowed message size defined by the service side. | ||
:param max_message_size: | ||
:param partition_key: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add types and descriptions of the parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Types and descriptions added.
event_data_with_pk, | ||
partition_key=partition_key) if partition_key else _BatchSendEventData(event_data) | ||
if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted. | ||
wrapper_event_data = event_data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above regarding raising an error if partition_key is going to be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code updated, an EventDataError will be raised if the keys don't match.
Do not instantiate an EventDataBatch object directly. | ||
""" | ||
|
||
log = logging.getLogger(__name__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be at the module level, not class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code updated.
log = logging.getLogger(__name__) | ||
|
||
def __init__(self, max_size=None, partition_key=None): | ||
self.max_size = max_size if max_size else constants.MAX_MESSAGE_LENGTH_BYTES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a simple or-statement:
self.max_size = max_size or contants.MAX_MESSAGE_LENGTH_BYTES
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the suggestion, code updated.
:return: | ||
""" | ||
if event_data is None: | ||
self.log.warning("event_data is None when calling EventDataBatch.try_add. Ignored") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use module-level logger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code updated.
* Update according to the review * Update comment
aece9c2
to
dfd4e39
Compare
* Change back to normal number writings as not supported by python under 3.6 * small fix
* Eventhubs conn (#6394) * Shared connection (sync) draft * Shared connection (sync) draft 2 * Shared connection (sync) test update * Shared connection * Fix an issue * add retry exponential delay and timeout to exception handling * put module method before class def * fixed Client.get_properties error * Improve send timeout (#6481) * Add timeout information to the link prop during link creation process (#6485) * Update optional parameters in public api into kwargs and update comments (#6510) * Update optional parameters in public api into kwargs and update some comments * Update more optional parameters into kwargs paramter * create_batch feature implementation (#6256) (#6324) * create_batch feature implementation (#6256) * Init create batch event * create_batch implementation * Revert _set_partition_key method and update comment * Refacor EventDataBatch class * Revert logic when setting partition_key of event_data * Misc fixes from code review * Rename max_message_size to max_size Other small fixes * Warn if event_data is None when call try_add * Create batch event update (#6509) * Update according to the review * Update comment * Revert some kwargs backto optional parameters as it may cause breaking changes * Small fixes (#6520) * Change back to normal number writings as not supported by python under 3.6 * small fix * Add missing return (#6522) * Fix livetest (#6523) * Eventhubs new EventProcessor (#6550) * Shared connection (sync) draft * Shared connection (sync) draft 2 * Shared connection (sync) test update * Shared connection * Fix an issue * add retry exponential delay and timeout to exception handling * put module method before class def * fixed Client.get_properties error * new eph (draft) * new eph (draft2) * remove in memory partition manager * EventProcessor draft 3 * small format change * Fix logging * Add EventProcessor example * use decorator to implement retry logic and update some tests (#6544) * Update livetest (#6547) * Remove legacy code and update livetest (#6549) * Update livetest * Remove legacy code and update livetest * make sync longrunning multi-threaded * small changes on async long running test * reset retry_count for iterator * Don't return early when open a ReceiveClient or SendClient * type annotation change * Update kwargs and remove unused import * Misc changes from EventProcessor PR review * raise asyncio.CancelledError out instead of supressing it. * Update livetest and small fixed (#6594) * Add missing close in livetest * Update livetest to wait longer * Close handler each time before retry * Fix feedback from PR (1) * Revert "Merge branch 'eventhubs_dev' into eventhubs_eph" This reverts commit 19a5539, reversing changes made to 9d18dd9. * Fix feedback from PR (2) * Update code according to the review (#6623) * Wait longer for reconnect op * Raise authentication error when open timeout * Optimize retry decorator * Update code according to review * Small fix * Fix feedback from PR (3) * small bug fixing * Remove old EPH * Update decorator implementation (#6642) * Update decorator implementation * Remove old EPH pytest * Revert "Revert "Merge branch 'eventhubs_dev' into eventhubs_eph"" This reverts commit d688090. * Update sample codes and docstring (#6643) * Check tablename to prevent sql injection * PR review update * Removed old EPH stuffs. Added new EPH stuffs. * Small fix (#6650) * Draft for changelog * Improve syntax for kwargs * keep partition manager open for call restart() again * Example to process async operations * Update version to 5.0.0b2 * fix mypy problem * fix small issue on max_retries * compatible with python < 3.7 * Update docstring of event processor * small changes on max_retries * small changes on max_retries * small changes * new EventProcessor long-running live test * change offset to text * Updating docstings, docs, samples (#6673) * Draft for updating documentations * Small improvement * Updating docstrings, docs and sample * support 3.5 type hint * fix 3.5 compatibility * Update docs (#6678) * Add a run_awhile example function * small fix on example * small fix on example * Update documentations (#6694) * Small update (#6696)
Init create batch event
create_batch implementation
Revert _set_partition_key method and update comment
Refacor EventDataBatch class
Revert logic when setting partition_key of event_data