Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize OperationInternalBase calls #27639

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eng/Directory.Build.Common.targets
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@

<!-- *********** Files needed for LRO ************* -->
<ItemGroup Condition="'$(IncludeOperationsSharedSource)' == 'true'">
<Compile Include="$(AzureCoreSharedSources)AsyncLockWithValue.cs" LinkBase="Shared/Core" />
<Compile Include="$(AzureCoreSharedSources)OperationHelpers.cs" LinkBase="Shared/Core" />
<Compile Include="$(AzureCoreSharedSources)OperationInternal.cs" LinkBase="Shared/Core" />
<Compile Include="$(AzureCoreSharedSources)OperationInternalBase.cs" LinkBase="Shared/Core" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,13 @@ async ValueTask<OperationState> IOperation.UpdateStateAsync(bool async, Cancella
.ConfigureAwait(false)
: _client.GetTransactionStatus(Id, new RequestContext { CancellationToken = cancellationToken, ErrorOptions = ErrorOptions.NoThrow });

_operationInternal.RawResponse = statusResponse;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed, _operationInternal updates its RawResponse


if (statusResponse.Status != (int)HttpStatusCode.OK)
{
var error = new ResponseError(null, exceptionMessage);
var ex = async
? await _client.ClientDiagnostics.CreateRequestFailedExceptionAsync(statusResponse, error).ConfigureAwait(false)
: _client.ClientDiagnostics.CreateRequestFailedException(statusResponse, error);
return OperationState.Failure(GetRawResponse(), new RequestFailedException(exceptionMessage, ex));
return OperationState.Failure(statusResponse, new RequestFailedException(exceptionMessage, ex));
}

string status = JsonDocument.Parse(statusResponse.Content)
Expand Down
227 changes: 227 additions & 0 deletions sdk/core/Azure.Core/src/Shared/AsyncLockWithValue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#nullable enable

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core.Pipeline;

namespace Azure.Core
{
/// <summary>
/// Primitive that combines async lock and value cache
/// </summary>
/// <typeparam name="T"></typeparam>
internal sealed class AsyncLockWithValue<T>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type is a copy of type from Azure.Identity with added HasValue property and TryGetValue method.

Copy link
Member

@christothes christothes Mar 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we make this change, we should change Identity's reference to this shared source version and delete its copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that.

{
private readonly object _syncObj = new object();
private Queue<TaskCompletionSource<Lock>>? _waiters;
private bool _isLocked;
private bool _hasValue;
private T? _value;

public bool HasValue
{
get
{
lock (_syncObj)
{
return _hasValue;
}
}
}

public AsyncLockWithValue() { }

public AsyncLockWithValue(T value)
{
_hasValue = true;
_value = value;
}

public bool TryGetValue(
out T? value)
{
lock (_syncObj)
{
if (_hasValue)
{
value = _value;
return true;
}
}

value = default;
return false;
}

/// <summary>
/// Method that either returns cached value or acquire a lock.
/// If one caller has acquired a lock, other callers will be waiting for the lock to be released.
/// If value is set, lock is released and all waiters get that value.
/// If value isn't set, the next waiter in the queue will get the lock.
/// </summary>
/// <param name="async"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async ValueTask<Lock> GetLockOrValueAsync(bool async, CancellationToken cancellationToken = default)
{
TaskCompletionSource<Lock> valueTcs;
lock (_syncObj)
{
// If there is a value, just return it
if (_hasValue)
{
return new Lock(_value!);
}

// If lock isn't acquire yet, acquire it and return to the caller
if (!_isLocked)
{
_isLocked = true;
return new Lock(this);
}

// Check cancellationToken before instantiating waiter
cancellationToken.ThrowIfCancellationRequested();

// If lock is already taken, create a waiter and wait either until value is set or lock can be acquired by this waiter
_waiters ??= new Queue<TaskCompletionSource<Lock>>();
// if async == false, valueTcs will be waited only in this thread and only synchronously, so RunContinuationsAsynchronously isn't needed.
valueTcs = new TaskCompletionSource<Lock>(async ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
_waiters.Enqueue(valueTcs);
}

try
{
if (async)
{
return await valueTcs.Task.AwaitWithCancellation(cancellationToken);
}

#pragma warning disable AZC0104 // Use EnsureCompleted() directly on asynchronous method return value.
#pragma warning disable AZC0111 // DO NOT use EnsureCompleted in possibly asynchronous scope.
valueTcs.Task.Wait(cancellationToken);
return valueTcs.Task.EnsureCompleted();
#pragma warning restore AZC0111 // DO NOT use EnsureCompleted in possibly asynchronous scope.
#pragma warning restore AZC0104 // Use EnsureCompleted() directly on asynchronous method return value.
}
catch (OperationCanceledException)
{
// Throw OperationCanceledException only if another thread hasn't set a value to this waiter
// by calling either Reset or SetValue
if (valueTcs.TrySetCanceled(cancellationToken))
{
throw;
}

return valueTcs.Task.Result;
}
}

/// <summary>
/// Set value to the cache and to all the waiters
/// </summary>
/// <param name="value"></param>
private void SetValue(T value)
{
Queue<TaskCompletionSource<Lock>> waiters;
lock (_syncObj)
{
_value = value;
_hasValue = true;
_isLocked = false;
if (_waiters == default)
{
return;
}

waiters = _waiters;
_waiters = default;
}

while (waiters.Count > 0)
{
waiters.Dequeue().TrySetResult(new Lock(value));
}
}

/// <summary>
/// Release the lock and allow next waiter acquire it
/// </summary>
private void Reset()
{
TaskCompletionSource<Lock>? nextWaiter = UnlockOrGetNextWaiter();
while (nextWaiter != default && !nextWaiter.TrySetResult(new Lock(this)))
{
nextWaiter = UnlockOrGetNextWaiter();
}
}

private TaskCompletionSource<Lock>? UnlockOrGetNextWaiter()
{
lock (_syncObj)
{
if (!_isLocked)
{
return default;
}

if (_waiters == default)
{
_isLocked = false;
return default;
}

while (_waiters.Count > 0)
{
var nextWaiter = _waiters.Dequeue();
if (!nextWaiter.Task.IsCompleted)
{
// Return the waiter only if it wasn't canceled already
return nextWaiter;
}
}

_isLocked = false;
return default;
}
}

public readonly struct Lock : IDisposable
{
private readonly AsyncLockWithValue<T>? _owner;
public bool HasValue => _owner == default;
public T? Value { get; }

public Lock(T value)
{
_owner = default;
Value = value;
}

public Lock(AsyncLockWithValue<T> owner)
{
_owner = owner;
Value = default;
}

public void SetValue(T value)
{
if (_owner != null)
{
_owner.SetValue(value);
}
else
{
throw new InvalidOperationException("Value for the lock is set already");
}
}

public void Dispose() => _owner?.Reset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth a comment here clarifying what this is intended to do. I believe the goal here is just to release any queued lock - not necessarily this one being disposed - and cycle through until we get another lock that hasn't completed, effectively like a semaphore, right?

}
}
}
26 changes: 9 additions & 17 deletions sdk/core/Azure.Core/src/Shared/OperationInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,22 @@ internal class OperationInternal : OperationInternalBase
/// </param>
/// <param name="scopeAttributes">The attributes to use during diagnostic scope creation.</param>
/// <param name="fallbackStrategy">The fallback delay strategy when Retry-After header is not present. When it is present, the longer of the two delays will be used. Default is <see cref="ConstantDelayStrategy"/>.</param>
/// <param name="finalState">Final state of the operation for the case when underlying service request is completed before LRO instance is created.</param>
public OperationInternal(
ClientDiagnostics clientDiagnostics,
IOperation operation,
Response rawResponse,
string? operationTypeName = null,
IEnumerable<KeyValuePair<string, string>>? scopeAttributes = null,
DelayStrategy? fallbackStrategy = null)
:base(clientDiagnostics, rawResponse, operationTypeName ?? operation.GetType().Name, scopeAttributes, fallbackStrategy)
DelayStrategy? fallbackStrategy = null,
OperationState? finalState = null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we worry about the conflict between the response in this final state and the response as the parameter? Should we take one or the other?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heaths , is it possible that these responses are different in keyvault?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the operations you changed, they are all pseudo-LROs anyway. The service backed endpoints aren't actually LROs, but we poll the final resource URI until it's not 404. Given the response and the final state should be agreed in that case, I don't think it should be possible.

:base(clientDiagnostics, rawResponse, operationTypeName ?? operation.GetType().Name, scopeAttributes, fallbackStrategy, finalState)
{
_operation = operation;
}

/// <summary>
/// Sets the <see cref="OperationInternal"/> state immediately.
/// </summary>
/// <param name="state">The <see cref="OperationState"/> used to set <see cref="OperationInternalBase.HasCompleted"/> and other members.</param>
public void SetState(OperationState state)
{
ApplyStateAsync(false, state.RawResponse, state.HasCompleted, state.HasSucceeded, state.OperationFailedException, throwIfFailed: false).EnsureCompleted();
}

protected override async ValueTask<Response> UpdateStateAsync(bool async, CancellationToken cancellationToken)
{
OperationState state = await _operation.UpdateStateAsync(async, cancellationToken).ConfigureAwait(false);
return await ApplyStateAsync(async, state.RawResponse, state.HasCompleted, state.HasSucceeded, state.OperationFailedException).ConfigureAwait(false);
}
protected override async ValueTask<OperationState> UpdateStateAsync(bool async, CancellationToken cancellationToken)
=> await _operation.UpdateStateAsync(async, cancellationToken).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -160,6 +150,9 @@ private OperationState(Response rawResponse, bool hasCompleted, bool hasSucceede

public RequestFailedException? OperationFailedException { get; }

public static OperationState FromTyped<T>(OperationState<T> operationState)
=> new(operationState.RawResponse, operationState.HasCompleted, operationState.HasSucceeded, operationState.OperationFailedException);

/// <summary>
/// Instantiates an <see cref="OperationState"/> indicating the operation has completed successfully.
/// </summary>
Expand All @@ -169,7 +162,6 @@ private OperationState(Response rawResponse, bool hasCompleted, bool hasSucceede
public static OperationState Success(Response rawResponse)
{
Argument.AssertNotNull(rawResponse, nameof(rawResponse));

return new OperationState(rawResponse, true, true, default);
}

Expand Down
Loading