Skip to content

Commit

Permalink
* @danielmarbach noticed an improvement. Do not elide await here.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebakken committed Jan 10, 2025
1 parent ee161bd commit ea3dbe6
Showing 1 changed file with 12 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)

public ushort Concurrency => _concurrency;

public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -93,23 +93,18 @@ public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string
{
AddConsumer(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);

cancellationToken.ThrowIfCancellationRequested();
return _writer.WriteAsync(work, cancellationToken);
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
catch
{
_ = GetAndRemoveConsumer(consumerTag);
throw;
}
}
else
{
return default;
}
}

public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
CancellationToken cancellationToken)
{
Expand All @@ -119,49 +114,34 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag,
{
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);

cancellationToken.ThrowIfCancellationRequested();
return _writer.WriteAsync(work, cancellationToken);
}
else
{
return default;
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
public async ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (false == _disposed && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);

cancellationToken.ThrowIfCancellationRequested();
return _writer.WriteAsync(work, cancellationToken);
}
else
{
return default;
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (false == _disposed && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);

cancellationToken.ThrowIfCancellationRequested();
return _writer.WriteAsync(work, cancellationToken);
}
else
{
return default;
await _writer.WriteAsync(work, cancellationToken)
.ConfigureAwait(false);
}
}

Expand Down

0 comments on commit ea3dbe6

Please sign in to comment.