Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make handling of publisher confirmations transparent to the user #1682

Closed
4 tasks done
lukebakken opened this issue Sep 18, 2024 · 32 comments · Fixed by #1687
Closed
4 tasks done

Make handling of publisher confirmations transparent to the user #1682

lukebakken opened this issue Sep 18, 2024 · 32 comments · Fixed by #1687
Assignees
Milestone

Comments

@lukebakken
Copy link
Contributor

lukebakken commented Sep 18, 2024

Is your feature request related to a problem? Please describe.

Mentioned here.

cc @danielmarbach @bording @Tornhoof

Describe the solution you'd like

  • Creating a channel via CreateChannelAsync will allow enabling publisher confirmations for the channel, and will allow enabling tracking of confirmations. An options class will be used.
  • When confirmations and tracking are enabled, BasicPublishAsync calls will wait indefinitely for the response from the broker. Users may implement a timeout via the cancellationToken parameter. If a Nack or Return is sent to the client, a PublishException will be thrown.
  • There will be a maximum count of outstanding confirmations allowed (also configurable). When this maximum is reached, calls to BasicPublishAsync will block.
  • (optional) when the current count of outstanding confirmations starts to near the maximum, delays could be introduced in publish attempts to allow the broker to catch up.

Describe alternatives you've considered

Users will be allowed to disable automatic tracking and do it themselves via BasicAcksAsync and BasicNacksAsync

Additional context

#1676 (comment)

@bording
Copy link
Collaborator

bording commented Sep 19, 2024

This seems like a good list of features/changes for the client to make regarding publisher confirms now that everything is async.

After the investigation Daniel and I did regarding #1676, I have some observations about the existing confirmation tracking that I think make sense to include here.

The current confirmation tracking that was added relies on the separate use of the WaitForConfirms* APIs. This API structure is a leftover relic of the non-async past of this client, and integrating the tracking directly into the BasicPublish call as this issue suggests is definitely the right call.

When the tracking is moved into the BasicPublish, it enables the ability to track confirmations on per-message basis, which the current tracking code does not do.

We added this sort of per-message tracking to the NServiceBus RabbitMQ transport a while back, and you can see how this works here.

All BasicPublishAsync calls will have publisher confirmations tracked, and the mandatory flag will also be set to true

It's interesting that you mention the mandatory flag here, because that's another area that the current tracking code seems to be deficient.

Any BasicReturn messages that are sent from the broker need to also be accounted for when doing confirmation tracking in the BasicPublish calls. Otherwise, the BasicReturn is ignored, and then the BasicPublish would process the ack and indicate the message was published.

In the tracking code we added to the transport, we handle the following things and map them back to the TaskCompletionSource:

  • ack
  • nack
  • return
  • channel shutdown

The complication about handling BasicReturn messages is that the broker does not currently return the confirmation id for correlation like ack/nack do. We solved this in our transport by recording the sequence number in a header that we can extract from the returned message. I'm not sure how the client would be able to do this, but maybe a header wouldn't be a terrible option here either? Of course it would be great if the broker could include the id in the return message, maybe include it in the ReplyText? No clue how feasible such a change would be.

@lukebakken
Copy link
Contributor Author

Of course it would be great if the broker could include the id in the return message, maybe include it in the ReplyText? No clue how feasible such a change would be.

I will look into this today.

@lukebakken
Copy link
Contributor Author

lukebakken commented Sep 19, 2024

Yes, I can see where adding the delivery tag to basic.return would be possible, but the protocol doesn't allow adding that information.

However, the protocol does guarantee that if a basic.return is sent to the client, the broker MUST immediately send basic.ack for that same message, which would allow correlation, client-side, in a very awkward way.

@bording
Copy link
Collaborator

bording commented Sep 19, 2024

Yes, I can see where adding the delivery tag to basic.return would be possible, but the protocol doesn't allow adding that information.

Can you explain a bit more about what this means? What's preventing it from being added to the existing text field of ReplyText? That would require parsing the text to find it, but that's not too terrible.

Is there something that makes the id just not available at all?

@lukebakken
Copy link
Contributor Author

I just added this comment when you commented 😸 👍

AMQP 0.9.1 does guarantee that if a basic.return is sent to the client, the broker MUST immediately send basic.ack for that same message, which would allow correlation, client-side, in a very awkward way.

@lukebakken
Copy link
Contributor Author

Here is where basic.return is constructed:

https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbit/src/rabbit_channel.erl#L1832-L1842

The Reason parameter is always not_found in this case, so the following function call...

https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbit/src/rabbit_channel.erl#L1836

...always returns NO_ROUTE for ReplyText. I suppose we could add more information to that field, but there's probably some application, somewhere, that depends on the existing behavior.

Adding a header is the best way to address this, or depend on the way the protocol works as I described.

@bording
Copy link
Collaborator

bording commented Sep 19, 2024

Adding a header is the best way to address this, or depend on the way the protocol works as I described.

Relying on the return + ack feels like something really hard to achieve. The header is a bit of a hack, but at least it's reliable and easy to implement.

I understand the concerns around adding something to the ReplyText, but that would be such a nice improvement!

@lukebakken
Copy link
Contributor Author

lukebakken commented Sep 19, 2024

Relying on the return + ack feels like something really hard to achieve

Maybe not ... ??? Thoughts? #1686

Update: nope, no guarantee of when the ack happens (https://www.rabbitmq.com/resources/specs/amqp0-9-1.extended.xml)

image

@bording
Copy link
Collaborator

bording commented Sep 20, 2024

Maybe not ... ??? Thoughts? #1686

Update: nope, no guarantee of when the ack happens (https://www.rabbitmq.com/resources/specs/amqp0-9-1.extended.xml)

I was going to say that I wasn't aware the broker had that strong of a guarantee of the sequencing of those, just that return would happen before ack, but I guess it doesn't!

@lukebakken
Copy link
Contributor Author

lukebakken commented Sep 20, 2024

I mean, the spec does say

Unroutable mandatory or immediate messages are acknowledged right after the Basic.Return method

...and this is what I see in my test program. I'm tracing the code rabbit_channel to see if this is in fact guaranteed.

I think the time guarantees section of the spec applies to "normal" acks, and not those that happen after basic.return cc @michaelklishin @kjnilsson @ansd may have some input.

Yes, it does appear that the state flow in rabbit_channel does guarantee that basic.ack for a basic.return-ed message will happen next but I want a second opinion.

@michaelklishin
Copy link
Member

michaelklishin commented Sep 20, 2024

The docs say the same thing, basic.return is sent before a (publisher confirms) basic.ack.

@bording
Copy link
Collaborator

bording commented Sep 20, 2024

Publisher confirms are a RabbitMQ-specific extension of the AMQP spec, so I'm not sure how much the spec itself would help here anyway.

It does seem prudent to know what the broker actually guarantees and if that's actually something "guaranteed" or just an implementation detail.

The spike PR was relying on an assumption that the matching basic.ack would be the very next thing received on the channel after the basic.return with no possibility of there being something else in between.

@lukebakken
Copy link
Contributor Author

lukebakken commented Sep 20, 2024

Publisher confirms are a RabbitMQ-specific extension of the AMQP spec, so I'm not sure how much the spec itself would help here anyway.

I forgot about that bit 🤦

The spike PR was relying on an assumption that the matching basic.ack would be the very next thing received on the channel after the basic.return with no possibility of there being something else in between.

Yep, wouldn't that be nice?

@kjnilsson
Copy link
Contributor

kjnilsson commented Sep 20, 2024

Shouldn't BasicPublishAsync block/wait when the maximum outstanding has been reached until the pending confirms fall below the limit?

@ansd
Copy link
Member

ansd commented Sep 20, 2024

@danielmarbach
Copy link
Collaborator

It seems the client currently doesn't set automatically a message id if non has been set. If we had a message ID that could be used to track states of the return and ACK/NACK sequence by having a secondary index from publish sequence number to IDs.

But then I wonder if it wouldn't be better to enforce a sequence number header when the confirms mode is enabled starting from v7. The overhead of that header is super minimal from a size perspective and comparably cheap compared to the decision of doing confirms anyway.

@danielmarbach
Copy link
Collaborator

So what about the following (raw ideas for discussion)

Change the ConfirmSelectAsync method to something like

        /// <summary>
        /// Asynchronously enable publisher confirmations.
        /// </summary>
        /// <param name="maximumNumberOfOutstandingConfirmations">Set to <c>null</c> if tracking via <see cref="BasicAcksAsync"/> and <see cref="BasicNacksAsync"/> yourself.</param>
        /// <param name="cancellationToken">CancellationToken for this operation.</param>
        Task ConfirmSelectAsync(ushort? maximumNumberOfOutstandingConfirmations = 10,
            CancellationToken cancellationToken = default);

by default it is set to some upper number of confirms that can be pending. If you want to track it yourself you can set it to null.

Internally the channel then makes sure the sequence number acquisition is properly guarded and enforcing the upper limit of number of oustanding confirms. When the upper limits are reached further publish attempts are hold up until for slots are available or the cancellation token triggers.

ConfirmSelectAsync would throw when it is called with new values when there are outstanding confirms.

In non confirms mode the publishes behave like today. In confirms mode they behave in the following way:

  • When the number of outstanding confirms is below the threshold the publish only returns when either ACK, NACK or BasicReturn happened. Throwing in all cases when there is no ackknowledgement or when the cancellation token triggers
  • When the number of oustanding confirms is reached the publish waits for available slots or until the cancellation token is triggered
  • In all exception cases the channel is never closed. Making that judgement is something the user can decide by calling CloseAsync
  • The sequence number is added to the properties using a well established known x- header

The methods WaitForConfirmsAsync and WaitForConfirmsOrDieAsync will be removed from the channel

Subsequent major releases might flip the default to have confirms enabled but that decision can be postponed.

@danielmarbach
Copy link
Collaborator

danielmarbach commented Sep 21, 2024

I'm also starting to wonder now that acquiring the sequence number is async whether that method should return a scope that makes sure the sequence number cannot be acquired while the scope is active.

That would give people tracking acks and nacks manually a chance to have guaranteed exclusive access to the sequence number value on the channel for the duration of the scope (internally acquiring / releasing the consumer semaphore)

@danielmarbach
Copy link
Collaborator

I think it would also make sense to configure the confirms defaults directly during the async channel creation instead of having a dedicated method and deal with all the ramifications of guarding against the value flipping mid way

@michaelklishin
Copy link
Member

@danielmarbach there is no way to disable publisher confirms once enabled. But I have no objections to having it as a constructor argument (or equivalent) at channel creation time.

@danielmarbach
Copy link
Collaborator

danielmarbach commented Sep 21, 2024

Sorry I was thinking about the tracking but wrote confirms defaults (which is ambiguous) because you can call the method multiple times with different flag values today

@rabbitmq rabbitmq deleted a comment from alrz Sep 22, 2024
lukebakken added a commit that referenced this issue Sep 22, 2024
Fixes #1682

* Remove `ConfirmSelectAsync` from `IChannel`
* Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
@lukebakken
Copy link
Contributor Author

@danielmarbach @bording I've done a little bit of work to get the ball rolling here:

#1687

Basically, just start working to remove the use of ConfirmSelectAsync, but still use the existing confirmation tracking code.

lukebakken added a commit that referenced this issue Oct 2, 2024
Fixes #1682

* Remove `ConfirmSelectAsync` from `IChannel`
* Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
lukebakken added a commit that referenced this issue Oct 3, 2024
Fixes #1682

* Remove `ConfirmSelectAsync` from `IChannel`
* Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
lukebakken added a commit that referenced this issue Oct 4, 2024
Fixes #1682

* Remove `ConfirmSelectAsync` from `IChannel`
* Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
lukebakken added a commit that referenced this issue Oct 8, 2024
Fixes #1682

* Remove `ConfirmSelectAsync` from `IChannel`
* Add parameters to enable confirmations on `IConnection.CreateChannelAsync`
* Remove / comment out all use of `WaitForConfirms...` methods.
* Dispose -> DisposeAsync
* Implement confirmation tracking and await-ing in `BasicPublishAsync`
* Ensure exceptions make into inner exception for `HardProtocolException`
* Remove commented-out code related to `WaitForConfirms...` methods.
* Unblock so that `CloseAsync` can succeed.
* Introduce channel creation options
* Allow cancellation of final await for publisher confirmation in `BasicPublishAsync`
* Fix `dotnet format` verification error
* Make `ConfirmSelectAsync` `private` and assume that semaphore is held.
* Track sequence number for basic.return
* Implement `basic.return` support.
* Fix the code that adds OTel and publish sequence number headers.
* Add publish sequence number as long as `_publisherConfirmationsEnabled` is true
* Fix the `PublisherConfirms` test program
* Add license headers
* Enable publisher confirms
* Spike an exception based approach (misses removing the bool value return type)
* Extend the use of `_confirmSemaphore` to the duration of when exceptions could be caught.
* Restore how @danielmarbach serialized the publish sequence number.
* Fix bug in how headers are added to `BasicProperties` that don't already have them.
* Use `ValueTask` as the `BasicPublishAsync` return value.

---------

Co-authored-by: Daniel Marbach <danielmarbach@users.noreply.github.com>
Co-authored-by: Luke Bakken <luke@bakken.io>

* Add `PublishException` class.

* Test that non-routable messages result in `PublishException` with `IsReturn = true`

* Code documentation

Simplify code.
@lukebakken lukebakken reopened this Oct 8, 2024
@lukebakken
Copy link
Contributor Author

Re-opening because #1687 implemented only some of the features of this issue.

@danielmarbach
Copy link
Collaborator

I'm also starting to wonder now that acquiring the sequence number is async whether that method should return a scope that makes sure the sequence number cannot be acquired while the scope is active.

That would give people tracking acks and nacks manually a chance to have guaranteed exclusive access to the sequence number value on the channel for the duration of the scope (internally acquiring / releasing the consumer semaphore)

Any thoughts on this one? Or is the current thinking with the improved tracking manually doing the sequence acquisition is not longer something desirable and therefore the API usage between acquiring the number and doing the publish doesn't require additional safety nets?

@lukebakken
Copy link
Contributor Author

Hey thanks for bringing that up again.

Yes I think that for the vast majority of users, the new tracking features will be more than sufficient. Also, in the history of this project the current sequence number acquisition behavior has been just fine, so we won't need anything else going forward. If we do at some point, it could be added to the existing API, presumably.

@danielmarbach
Copy link
Collaborator

danielmarbach commented Oct 23, 2024

@lukebakken It seems the x-seq is always added

            channel = await connection.CreateChannelAsync(new CreateChannelOptions
            {
                PublisherConfirmationsEnabled = true,
            }, cancellationToken: cancellationToken).ConfigureAwait(false);

image

Was that deliberate or should it only be added when tracking is enabled?

@lukebakken
Copy link
Contributor Author

If a user enables publisher confirmations, but does not enable tracking, and wants to handle acks/nacks/returns via the events, I thought it would be handy to have that sequence number. Of course, we could also say it's up to the user to add it in the DIY tracking case, too.

@lukebakken
Copy link
Contributor Author

@danielmarbach - #1710

@danielmarbach
Copy link
Collaborator

If a user enables publisher confirmations, but does not enable tracking, and wants to handle acks/nacks/returns via the events, I thought it would be handy to have that sequence number. Of course, we could also say it's up to the user to add it in the DIY tracking case, too.

I'm really not that deep in the weeds to be able to say what is best, that's why I formulated it as a question. In my head I was thinking that for manual tracking you need to acquire the next sequence number anyway to store it somewhere to be able to correlate things back. That's why I got curious and started thinking it might only need to be added when publisher confirms tracking is enabled. In all other cases, they can use the constant defined and do things on their own. But again not 100% sure if my mental model holds up

@danielmarbach
Copy link
Collaborator

FYI this is the latest RC upgrade PR Particular/NServiceBus.RabbitMQ#1472. It really simplifies a ton and passes the tests 🙉

@lukebakken
Copy link
Contributor Author

That's great news!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants