Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce async consumer dispatcher #307

Closed

Conversation

danielmarbach
Copy link
Collaborator

@danielmarbach danielmarbach commented Feb 3, 2017

I was following an idea to see if it would be possible to make a tiny step in a minor version to become async on the path where the client dispatches to the consumers. I was able to get it going by doing the following:

  • Introduce an internal IAsyncConnection
  • Introduce an Async flag on the connection factory which allows to opt-in for the async consumer dispatcher
  • Decorate the IConnection with an IAsyncConnection when the connection factory flag is set
  • Introduce IAsyncBasicConsumer for fully asynchronous consumers
  • Basic consume ensures when IAsyncConnection is used you can only use the IAsyncBasicConsumer implementations
  • AsyncConsumerDispatcher stick to IConsumerDispatcher but internally enqueues Func<Task> instead of Action
  • The worker pool can now use Task.Run with an async body. The idle phase is implemented with a TaskCompletionSource and a Task.WhenAny that either completes when the delay is due or the TCS is set. Use atomic field reference assigned to create a new TaskCompletionSource after each iteration (this should be fine since we only have one loop and concurrent enqueues should still work due to TrySetResult)
  • Removed closure allocation in dispatcher. Removes the Func<Task> allocations per call on the consumer dispatcher. So that means we save one allocation per call

Before

public void HandleBasicDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
{
    AsyncConsumerDispatcher.\u003C\u003Ec__DisplayClass11_0 cDisplayClass110 = new AsyncConsumerDispatcher.\u003C\u003Ec__DisplayClass11_0();
    cDisplayClass110.\u003C\u003E4__this = this;
    cDisplayClass110.consumer = consumer;
    cDisplayClass110.consumerTag = consumerTag;
    cDisplayClass110.deliveryTag = deliveryTag;
    cDisplayClass110.redelivered = redelivered;
    cDisplayClass110.exchange = exchange;
    cDisplayClass110.routingKey = routingKey;
    cDisplayClass110.basicProperties = basicProperties;
    cDisplayClass110.body = body;
    // ISSUE: method pointer
    this.UnlessShuttingDown(new Func<Task>((object) cDisplayClass110, __methodptr(\u003CHandleBasicDeliver\u003Eb__0)));
}

After

public void HandleBasicDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
{
    this.ScheduleUnlessShuttingDown((Work) new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
}

Benefits

  • Fully async for the consumer implementation
  • Consumer worker thread is no longer blocked at any time and can participate in consumer implementation execution and continuations
  • Can be implemented as a minor withouth breaking anything while introducing slightly less efficient consumer dispatching

Tradeoffs

  • Some evil internal casting (but I think this is necessary if we want to make it none breaking)
  • Introduced AsyncEventHandler to stick as much as possible to the current way how consumers work although probably a Func<Args, Task> would be more elegant.
  • Requires a few GetAwaiter().GetResult() calls (shudder)
  • Changed the shutdown phase to wait for the work pool to complete (happy to remove if not desired)

@danielmarbach
Copy link
Collaborator Author

Marked it as WIP since I would like to get some feedback first and then take it to finish. Thoughts? // cc @bording @adamralph

@danielmarbach
Copy link
Collaborator Author

Btw. in my local machine I exclude the WinRT projects for now until I get 👍 or 👎

@danielmarbach danielmarbach mentioned this pull request Feb 4, 2017
@danielmarbach danielmarbach force-pushed the async-consumer-dispatch branch from 5dc48ee to 3ce5e10 Compare February 4, 2017 20:18
@danielmarbach
Copy link
Collaborator Author

The current ConsumerDispatcher has an issue of closure allocation. Because this PR introduces a dedicated path that is internal we can no optimise that path to reduce the closure allocation. I added a commit that removes the Func<Task> allocations per call on the consumer dispatcher. So that means we save one allocation per call

Before

public void HandleBasicDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
{
    AsyncConsumerDispatcher.\u003C\u003Ec__DisplayClass11_0 cDisplayClass110 = new AsyncConsumerDispatcher.\u003C\u003Ec__DisplayClass11_0();
    cDisplayClass110.\u003C\u003E4__this = this;
    cDisplayClass110.consumer = consumer;
    cDisplayClass110.consumerTag = consumerTag;
    cDisplayClass110.deliveryTag = deliveryTag;
    cDisplayClass110.redelivered = redelivered;
    cDisplayClass110.exchange = exchange;
    cDisplayClass110.routingKey = routingKey;
    cDisplayClass110.basicProperties = basicProperties;
    cDisplayClass110.body = body;
    // ISSUE: method pointer
    this.UnlessShuttingDown(new Func<Task>((object) cDisplayClass110, __methodptr(\u003CHandleBasicDeliver\u003Eb__0)));
}

After

public void HandleBasicDeliver(IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties basicProperties, byte[] body)
{
    this.ScheduleUnlessShuttingDown((Work) new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
}

Will update the description

@danielmarbach
Copy link
Collaborator Author

Switched to generic method dispatch where possible. It is slightly faster than casting in the async consumer dispatcher

BenchmarkDotNet=v0.10.1, OS=Microsoft Windows NT 6.2.9200.0
Processor=Intel(R) Core(TM) i7-4980HQ CPU 2.80GHz, ProcessorCount=8
Frequency=2728066 Hz, Resolution=366.5600 ns, Timer=TSC
  [Host]     : Clr 4.0.30319.42000, 32bit LegacyJIT-v4.6.1586.0
  Job-FPAYGT : Clr 4.0.30319.42000, 64bit RyuJIT-v4.6.1586.0
  Job-VVRXBX : Clr 4.0.30319.42000, 32bit LegacyJIT-v4.6.1586.0

Method Platform Mean StdDev StdErr Min Q1 Median Q3 Max Op/s Scaled Scaled-StdDev Gen 0 Allocated
MethodBase X64 4.4247 ns 0.0270 ns 0.0072 ns 4.3820 ns 4.4066 ns 4.4185 ns 4.4348 ns 4.4701 ns 226006054.95 1.00 0.00 0.0036 24 B
MethodGeneric X64 4.3599 ns 0.0466 ns 0.0120 ns 4.2800 ns 4.3351 ns 4.3578 ns 4.3992 ns 4.4618 ns 229364553.2 0.99 0.01 0.0034 24 B
MethodBase X86 4.4090 ns 0.0246 ns 0.0068 ns 4.3598 ns 4.3921 ns 4.4157 ns 4.4256 ns 4.4526 ns 226810688.98 1.00 0.00 0.0021 12 B
MethodGeneric X86 4.3581 ns 0.0213 ns 0.0059 ns 4.3281 ns 4.3425 ns 4.3538 ns 4.3709 ns 4.4053 ns 229459580.13 0.99 0.01 0.0019 12 B

Implementing an internal interface is a non-breaking change from an API perspective
…now inherits from Connection

AutorecoveringConnection and ProtocolBase create correct connection depending on settings
@kjnilsson
Copy link
Contributor

Thank you! I will take a proper look on Monday. Bear in mind the next release will most likely be a major version bump so some breaking changes could be accommodated.

@danielmarbach
Copy link
Collaborator Author

@kjnilsson Ok then I can remove a few trickeries here.

@michaelklishin
Copy link
Member

@danielmarbach thank you! Do you have any numbers that compare this version with what's currently in master?

Also, would be really great to get some feedback from @bording :)

@danielmarbach
Copy link
Collaborator Author

danielmarbach commented Feb 5, 2017 via email

@kjnilsson
Copy link
Contributor

@danielmarbach I've taken a bit of a look at your PR today and the overall approach of adding async consumers as an additional, optional feature looks good to me and in terms of that you've got a 👍 .

You've outlined some of the benefits above. Once it is a bit more finished it would be nice to try to gather some numbers on any scalability benefits this would provide.

General feedback:

  • I think it is fine to replace AsyncEventHandlers with Func<Args, Task>. Never felt comfortable with mixing multicast delegates with manual acks.
  • Not sure about the new flag on the ConnectionFactory to enable the special async mode. I'm wondering if there is a more intuitive api we can provide. An AsyncConnectionFactory naturally wouldn't be a completely accurate given only consumption being async. Will think about it a bit more.

Cheers
Karl

@danielmarbach
Copy link
Collaborator Author

danielmarbach commented Feb 6, 2017 via email

@bording
Copy link
Collaborator

bording commented Feb 11, 2017

@michaelklishin Just wanted to say that I've been looking at this with @danielmarbach and so far I like the direction it's heading in. I do think some additional changes are going to be needed, because there's some weird performance issues that we've not been able to sort out yet.

@michaelklishin
Copy link
Member

@bording sounds promising, thank you both!

@danielmarbach
Copy link
Collaborator Author

I hesitated to do a benchmark with Benchmark.NET because it would burn too many of my CPU cycles ;). Here is a simple test

https://gist.github.com/danielmarbach/80b2c60cced407e1d3500db8c85de543

Big caveat:

Depending on the clock resolution (on my machine something around 15 miliseconds) the Task.Delay version can be slower. @bording and I ended up playing around with it and pretty much saw good or better results with the async version as longs as Thread.Sleep vs. Task.Delay was a factor of 15 milliseconds. So Thread.Sleep(15) would be Task.Delay(1). If you set Thread.Sleep(1) and Task.Delay(1) then the await version will be 15 times slower because of the internal timer resolution that is used with Task.Delay. So that would be an unfair comparison. Set Thread.Sleep and Task.Delay to a factor of 15 milliseconds. Try to remember that if you play with the results. Here is a local run on my machine

When it comes down to the decision whether you should take this PR in or not it is difficult to give a good general advice. You can argue that from an async programming perspective the newer API provides a better async model and in combination with multiple consumers more efficient resource usage. On the other hand it is also a redundant code path which comes with maintainability costs.

I'm leaving this up to you.

@michaelklishin
Copy link
Member

@danielmarbach thank you. We would definitely consider it. @kjnilsson is on vacation at the moment, so this can take about a week.

By the way, I don't see any benchmark results in you comment. Should there be any?

@kjnilsson
Copy link
Contributor

kjnilsson commented Feb 14, 2017 via email

@danielmarbach
Copy link
Collaborator Author

@kjnilsson that is exactly the reasoning why I didn't provide any comparison benchmarks. When you compare a single synchronous consumer with a single asynchronous consumer then essentially we are comparing apple with oranges. It is very likely that a single blocking consumer for small operations is faster than the asynchronous one. It is also likely that a single asynchronous consumer is always a bit slower than the sync one. When it starts shining is in terms of resource utilization, lower CPU usage and better API usage with asynchronous implementors in multi-consumer scenarios.

@michaelklishin michaelklishin added this to the 5.0.0 milestone Feb 14, 2017
@bording
Copy link
Collaborator

bording commented Feb 14, 2017

Generally I think async consumers is a nice option that some users may
appreciate irrespectively of minor performance differences. @danielmarbach
@bording would you look to update the NSB RabbitMQ binding to use async
consumers?

@kjnilsson That was certainly the intent behind investigating this PR. We do already have a pretty viable approach with how things are without this, so I'd definitely want to do some additional perf testing to see if ended up making sense to move over to this.

@kjnilsson
Copy link
Contributor

@bording ok thanks, I took a look at your current code for this. It will be interesting to see how it compares. :)

@michaelklishin
Copy link
Member

Should we consider this for merging or there's more to come?

@danielmarbach
Copy link
Collaborator Author

danielmarbach commented Feb 24, 2017 via email

@kjnilsson
Copy link
Contributor

Apologies @danielmarbach , I was holding back a bit as I got the impression you and @bording were looking at a performance issue. I will spend some time on this PR this week.

We would need some tests for this before merging. I will probably add some as part of the review.

@michaelklishin michaelklishin changed the title [WIP] Async consumer dispatcher Introduce async consumer dispatcher Feb 27, 2017
@bording
Copy link
Collaborator

bording commented Feb 27, 2017

@kjnilsson The performance issue I mentioned here ended up being related to the Task.Delay stuff that @danielmarbach mentioned here, so it turned out to not actually be an issue once we had our tests comparing the same results. Sorry for the confusion if that wasn't clear enough!

I do still want to compare the usage of the async consumer to the original in our NSB RabbitMQ transport, but I haven't gotten to that yet. I wouldn't expect that decision to hold this PR up, since there is value in having the async consumer regardless.

@kjnilsson kjnilsson mentioned this pull request Mar 3, 2017
Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me and I'm happy to merge this into master and release a preview. I have republished the changes in a separate branch where I am adding some tests. I want to write a couple of more tests then I will merge.

@kjnilsson
Copy link
Contributor

Ok this has been manually merged with a couple of added tests. Closing.

Thank you!

@kjnilsson kjnilsson closed this Mar 6, 2017
@kjnilsson
Copy link
Contributor

I've published a pre-release with these changes here: https://www.nuget.org/packages/RabbitMQ.Client/5.0.0-pre3

@danielmarbach danielmarbach deleted the async-consumer-dispatch branch March 6, 2017 22:21
@danielmarbach
Copy link
Collaborator Author

Btw @kjnilsson the changes could go into a minor. doesn't have to be a major bump.

@adamralph
Copy link
Contributor

@kjnilsson why has the major version been bumped to 5?

@kjnilsson
Copy link
Contributor

kjnilsson commented Mar 15, 2017 via email

@thtp
Copy link

thtp commented Jul 25, 2017

In the WorkPool, why do you need to spin every 100ms? Why cant you use Timeout.Infinite and await cancellation from Stop() and new message from Enqueue(...)? It seems to me that spinning is useless.
If you are trying to overcome race over messageArrived result, consider using something like this:
Interlocked.Exchange(ref messageArrived, new TaskCompletionSource<bool>())?.TrySetResult(true);
And just reset completion source on each dequeue iteration.

@michaelklishin
Copy link
Member

@thtp this PR is a result of at least 2 different approaches that were investigated and a fair amount of benchmarking. Do you have any numbers to back your claim or is it a gut feeling? I'd recommend submitting a pull request over claiming that the approach that was adopted is useless. It would be a lot more convincing than random comments.

@danielmarbach
Copy link
Collaborator Author

danielmarbach commented Jul 25, 2017 via email

@michaelklishin
Copy link
Member

Before anyone spends a lot of time on the sync dispatcher, keep in mind that we still hope to begin working on a new from scratch .NET client this year and there may or may not be a place for the sync dispatcher there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants