Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch TaskExtensions bug in Azure.Messaging.ServiceBus #14320

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 7 additions & 29 deletions sdk/core/Azure.Core/src/Shared/TaskExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ public static T EnsureCompleted<T>(this Task<T> task)
{
#if DEBUG
VerifyTaskCompleted(task.IsCompleted);
#else
if (HasSynchronizationContext())
{
throw new InvalidOperationException("Synchronously waiting on non-completed task isn't allowed.");
}
#endif
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
return task.GetAwaiter().GetResult();
Expand All @@ -40,11 +35,6 @@ public static void EnsureCompleted(this Task task)
{
#if DEBUG
VerifyTaskCompleted(task.IsCompleted);
#else
if (HasSynchronizationContext())
{
throw new InvalidOperationException("Synchronously waiting on non-completed task isn't allowed.");
}
#endif
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
task.GetAwaiter().GetResult();
Expand All @@ -53,31 +43,22 @@ public static void EnsureCompleted(this Task task)

public static T EnsureCompleted<T>(this ValueTask<T> task)
{
if (!task.IsCompleted)
{
#pragma warning disable AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available.
return EnsureCompleted(task.AsTask());
#pragma warning restore AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available.
}
#if DEBUG
VerifyTaskCompleted(task.IsCompleted);
#endif
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
return task.GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
}

public static void EnsureCompleted(this ValueTask task)
{
if (!task.IsCompleted)
{
#pragma warning disable AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available.
EnsureCompleted(task.AsTask());
#pragma warning restore AZC0107 // public asynchronous method shouldn't be called in synchronous scope. Use synchronous version of the method if it is available.
}
else
{
#if DEBUG
VerifyTaskCompleted(task.IsCompleted);
#endif
#pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
task.GetAwaiter().GetResult();
task.GetAwaiter().GetResult();
#pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extension method instead.
}
}

public static Enumerable<T> EnsureSyncEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable) => new Enumerable<T>(asyncEnumerable);
Expand Down Expand Up @@ -120,9 +101,6 @@ private static void VerifyTaskCompleted(bool isCompleted)
}
}

private static bool HasSynchronizationContext()
=> SynchronizationContext.Current != null && SynchronizationContext.Current.GetType() != typeof(SynchronizationContext) || TaskScheduler.Current != TaskScheduler.Default;

/// <summary>
/// Both <see cref="Enumerable{T}"/> and <see cref="Enumerator{T}"/> are defined as public structs so that foreach can use duck typing
/// to call <see cref="Enumerable{T}.GetEnumerator"/> and avoid heap memory allocation.
Expand Down
181 changes: 0 additions & 181 deletions sdk/core/Azure.Core/tests/TaskExtensionsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,144 +4,13 @@
using Azure.Core.Pipeline;
using NUnit.Framework;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Core.Tests
{
public class TaskExtensionsTest
{
[Test]
public void TaskExtensions_TaskEnsureCompleted()
{
var task = Task.CompletedTask;
task.EnsureCompleted();
}

[Test]
public void TaskExtensions_TaskOfTEnsureCompleted()
{
var task = Task.FromResult(42);
Assert.AreEqual(42, task.EnsureCompleted());
}

[Test]
public void TaskExtensions_ValueTaskEnsureCompleted()
{
var task = new ValueTask();
task.EnsureCompleted();
}

[Test]
public void TaskExtensions_ValueTaskOfTEnsureCompleted()
{
var task = new ValueTask<int>(42);
Assert.AreEqual(42, task.EnsureCompleted());
}

[Test]
public async Task TaskExtensions_TaskEnsureCompleted_NotCompletedNoSyncContext()
{
var tcs = new TaskCompletionSource<int>();
Task task = tcs.Task;
#if DEBUG
Assert.Catch<InvalidOperationException>(() => task.EnsureCompleted());
await Task.CompletedTask;
#else
Task runningTask = Task.Run(() => task.EnsureCompleted());
Assert.IsFalse(runningTask.IsCompleted);
tcs.SetResult(0);
await runningTask;
#endif
}

[Test]
public async Task TaskExtensions_TaskOfTEnsureCompleted_NotCompletedNoSyncContext()
{
var tcs = new TaskCompletionSource<int>();
#if DEBUG
Assert.Catch<InvalidOperationException>(() => tcs.Task.EnsureCompleted());
await Task.CompletedTask;
#else
Task<int> runningTask = Task.Run(() => tcs.Task.EnsureCompleted());
Assert.IsFalse(runningTask.IsCompleted);
tcs.SetResult(42);
Assert.AreEqual(42, await runningTask);
#endif
}

[Test]
public async Task TaskExtensions_ValueTaskEnsureCompleted_NotCompletedNoSyncContext()
{
var tcs = new TaskCompletionSource<int>();
ValueTask task = new ValueTask(tcs.Task);
#if DEBUG
Assert.Catch<InvalidOperationException>(() => task.EnsureCompleted());
await Task.CompletedTask;
#else
Task runningTask = Task.Run(() => task.EnsureCompleted());
Assert.IsFalse(runningTask.IsCompleted);
tcs.SetResult(0);
await runningTask;
#endif
}

[Test]
public async Task TaskExtensions_ValueTaskOfTEnsureCompleted_NotCompletedNoSyncContext()
{
var tcs = new TaskCompletionSource<int>();
ValueTask<int> task = new ValueTask<int>(tcs.Task);
#if DEBUG
Assert.Catch<InvalidOperationException>(() => task.EnsureCompleted());
await Task.CompletedTask;
#else
Task<int> runningTask = Task.Run(() => task.EnsureCompleted());
Assert.IsFalse(runningTask.IsCompleted);
tcs.SetResult(42);
Assert.AreEqual(42, await runningTask);
#endif
}

[Test]
public void TaskExtensions_TaskEnsureCompleted_NotCompletedInSyncContext()
{
using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext();
var tcs = new TaskCompletionSource<int>();
Task task = tcs.Task;

syncContext.Post(t => { Assert.Catch<InvalidOperationException>(() => task.EnsureCompleted()); }, null);
}

[Test]
public void TaskExtensions_TaskOfTEnsureCompleted_NotCompletedInSyncContext()
{
using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext();
var tcs = new TaskCompletionSource<int>();

syncContext.Post(t => { Assert.Catch<InvalidOperationException>(() => tcs.Task.EnsureCompleted()); }, null);
}

[Test]
public void TaskExtensions_ValueTaskEnsureCompleted_NotCompletedInSyncContext()
{
using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext();
var tcs = new TaskCompletionSource<int>();
ValueTask task = new ValueTask(tcs.Task);

syncContext.Post(t => { Assert.Catch<InvalidOperationException>(() => task.EnsureCompleted()); }, null);
}

[Test]
public void TaskExtensions_ValueTaskOfTEnsureCompleted_NotCompletedInSyncContext()
{
using SingleThreadedSynchronizationContext syncContext = new SingleThreadedSynchronizationContext();
var tcs = new TaskCompletionSource<int>();
var task = new ValueTask<int>(tcs.Task);

syncContext.Post(t => { Assert.Catch<InvalidOperationException>(() => task.EnsureCompleted()); }, null);
}

[Test]
public void TaskExtensions_TaskWithCancellationDefault()
{
Expand Down Expand Up @@ -323,55 +192,5 @@ public void TaskExtensions_ValueTaskWithCancellationFailedAfterContinuationSched
Assert.AreEqual(true, awaiter.IsCompleted);
Assert.Catch<OperationCanceledException>(() => awaiter.GetResult(), "Error");
}

private sealed class SingleThreadedSynchronizationContext : SynchronizationContext, IDisposable
{
private readonly Task _task;
private readonly BlockingCollection<Action> _queue;
private readonly ConcurrentQueue<Exception> _exceptions;

public SingleThreadedSynchronizationContext()
{
_queue = new BlockingCollection<Action>();
_exceptions = new ConcurrentQueue<Exception>();
_task = Task.Run(RunLoop);
}

private void RunLoop()
{
try
{
SetSynchronizationContext(this);
while (!_queue.IsCompleted)
{
Action action = _queue.Take();
try
{
action();
}
catch (Exception e)
{
_exceptions.Enqueue(e);
}
}
}
catch (InvalidOperationException) { }
catch (OperationCanceledException) { }
finally
{
SetSynchronizationContext(null);
}
}

public override void Post(SendOrPostCallback d, object state) => _queue.Add(() => d(state));

public void Dispose()
{
_queue.CompleteAdding();
_task.Wait();
}

public AggregateException Exceptions => new AggregateException(_exceptions);
}
}
}
5 changes: 5 additions & 0 deletions sdk/servicebus/Azure.Messaging.ServiceBus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Release History

## 7.0.0-preview.6 (2020-08-18)

### Fixed
- Bug in TaskExtensions.EnsureCompleted method that causes it to unconditionally throw an exception in the environments with synchronization context

## 7.0.0-preview.5 (2020-08-11)

### Acknowledgements
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>Azure Service Bus is a fully managed enterprise integration message broker. Service Bus can decouple applications and services. Service Bus offers a reliable and secure platform for asynchronous transfer of data and state. This client library allows for both sending and receiving messages using Azure Service Bus. For more information about Service Bus, see https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview</Description>
<Version>7.0.0-preview.5</Version>
<Version>7.0.0-preview.6</Version>
<PackageTags>Azure;Service Bus;ServiceBus;.NET;AMQP;$(PackageCommonTags)</PackageTags>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
</PropertyGroup>
Expand Down