-
Notifications
You must be signed in to change notification settings - Fork 118
[WIP] Batching #539
base: dev
Are you sure you want to change the base?
[WIP] Batching #539
Conversation
@makam this is the approach I'd like to take. If you see no issues, I'll continue with the rest of TODOs. |
@vinaysurya I cannot trigger rebuilds for https://ci.appveyor.com/project/vinaysurya/azure-service-bus-dotnet. Could you please modify my permissions to allow me to do so? Thank you. |
6d45105
to
78ed01d
Compare
@makam @vinaysurya I've got everything except diagnostics. I'd like to start a review of this PR. The feature could be used with a caveat of not having full Diagnostics available if users require this feature until alternative diagnostics registration is available and added later. |
@nemakam @vinaysurya ping |
@nemakam @vinaysurya let me know when you're ready to start reviewing. Thanks. |
/// </summary> | ||
/// <param name="maximumBatchSize">Maximum batch size allowed for batch.</param> | ||
/// <param name="pluginsCallback">Plugins callback to invoke on outgoing messages regisered with batch.</param> | ||
public Batch(ulong maximumBatchSize, Func<Message, Task<Message>> pluginsCallback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public Batch [](start = 8, length = 12)
I think having this constructor will cause more confusion. We already have a way to create a batch on the sender.
For testing as of now, we could make this internal.
For customers who need to test this, they could mock a Sender (in the future version) and then create a batch accordingly.
Having this constructor exposed like this will definitely create confusion to customers which is just explained in remarks (which is not the most accessible comment).
I would think especially the second param - callback is going to be even more confusing for a beginner user.
Lets make this internal for now.
// Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
|
||
using System.Threading.Tasks; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inside
using Microsoft.Azure.ServiceBus.Diagnostics; | ||
|
||
[DebuggerDisplay("{" + nameof(DebuggerDisplay) + ",nq}")] | ||
public class Batch : IDisposable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch [](start = 17, length = 5)
Message - MessageBatch.
Sounds more consistent. Maybe rename?
/// Convert batch to AMQP message. | ||
/// </summary> | ||
/// <returns></returns> | ||
public AmqpMessage ToAmqpMessage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public [](start = 8, length = 6)
This doesn't need to be public
@@ -149,5 +150,14 @@ internal static bool TryExtractContext(this Message message, out IList<KeyValueP | |||
} | |||
return false; | |||
} | |||
|
|||
|
|||
internal static void VerifyMessageIsNotPreviouslyReceived(this Message message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
VerifyMessageIsNotPreviouslyReceived [](start = 29, length = 36)
This class is dedicated to diagnostics. Lets move this out of this class
|
||
if (originalMessageData.messageId != null) | ||
{ | ||
result.Properties.MessageId = originalMessageData.messageId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result.Properties.MessageId = originalMessageData.messageId; [](start = 16, length = 60)
What if TryAdd
succeeded, and then you tried to copy over MessageId
and other properties, and you crossed your size limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, the moment you receive your second TryAdd
, form the result along with its updated properties so that you can validate the size.
In reply to: 219366896 [](ancestors = 219366896)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely. It also removes the need to "remember" original message system properties and tuple that stores those. Nice!
{ | ||
if (result == null) | ||
{ | ||
throw new Exception("Batch is has been disposed and cannot be re-used."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception [](start = 26, length = 9)
ObjectDisposedException
{ | ||
if (result == null) | ||
{ | ||
throw new Exception("Batch is has been disposed and cannot be re-used."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Batch is [](start = 37, length = 8)
typo
} | ||
} | ||
|
||
private string DebuggerDisplay => $"Batch: size={Size} message count={datas.Count} maximum size={maximumBatchSize}"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
size={Size} message count= [](start = 51, length = 26)
Add a .
or ;
or any identifier between two properties so that its easier to read
const string PartitionIdName = "x-opt-partition-id"; | ||
const string ViaPartitionKeyName = "x-opt-via-partition-key"; | ||
internal const string PartitionKeyName = "x-opt-partition-key"; | ||
internal const string ViaPartitionKeyName = "x-opt-via-partition-key"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reorder them to the beginning so that internals would come before private
5a67d61
to
323d7cd
Compare
@nemakam integrated your feedback |
Marked PR as |
Adding extra info to identify why the test is failing on full framework
…ded in the safe size calculation.
@SeanFeldman - we are moving the underlying issue to the [Azure SDK for .NET] (https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/servicebus/Microsoft.Azure.ServiceBus) repo. Once we archive this repo, the PR will be read only and we can leverage this test case in the new repo. |
Fixes #538
The client allows to send a collection of messages (
SendAsync(IList<Message>)
) which is problematic when there's a large number of messages that would not fit into the maximum message size. As a result of that, an exception is thrown when the send operation is invoked.This PR introduces a safe batching option. A
Batch
is constructed before send operation is invoked and messages are added usingbool TryAdd(Message)
API. As long as batch total size doesn't exceed the maximum, messages are added to it. Otherwise, message is not added and can be carried over to the next batch. With this API messages sent will not exceed maximum message size.What's left:
BatchTests
andSenderReceiverTests
)Batch
(logic fromMessageSender.ValidateMessage()
)SendAsync(Batch)
toISenderClient
--> Define SendAsync(Batch) and CreateBatch() on ISenderClient #543SendAsync()
overloadsBLOCKED* Support Diagnostics similar to howSendAsync(IList<Message>)
does it* blocked until Diagnostics can support a different approach of registering messages. See an experimental PR comments here.
Opening this PR to discuss further implementation details