Skip to content

Commit

Permalink
Search Fancy Batching
Browse files Browse the repository at this point in the history
Adds SearchIndexingBufferedSender to index search documents with intelligent
batching, automatic flushing, and retries for failed indexing actions.

Fixes Azure#11161.
  • Loading branch information
tg-msft committed Oct 9, 2020
1 parent d9e73da commit cb0c1f1
Show file tree
Hide file tree
Showing 17 changed files with 2,145 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public SearchClient(System.Uri endpoint, string indexName, Azure.AzureKeyCredent
public virtual string ServiceName { get { throw null; } }
public virtual Azure.Response<Azure.Search.Documents.Models.AutocompleteResults> Autocomplete(string searchText, string suggesterName, Azure.Search.Documents.AutocompleteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Search.Documents.Models.AutocompleteResults>> AutocompleteAsync(string searchText, string suggesterName, Azure.Search.Documents.AutocompleteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Search.Documents.SearchIndexingBufferedSender<T> CreateIndexingBufferedSender<T>(Azure.Search.Documents.SearchIndexingBufferedSenderOptions<T> options = null) { throw null; }
public virtual Azure.Response<Azure.Search.Documents.Models.IndexDocumentsResult> DeleteDocuments(string keyName, System.Collections.Generic.IEnumerable<string> keyValues, Azure.Search.Documents.IndexDocumentsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Search.Documents.Models.IndexDocumentsResult>> DeleteDocumentsAsync(string keyName, System.Collections.Generic.IEnumerable<string> keyValues, Azure.Search.Documents.IndexDocumentsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Search.Documents.Models.IndexDocumentsResult>> DeleteDocumentsAsync<T>(System.Collections.Generic.IEnumerable<T> documents, Azure.Search.Documents.IndexDocumentsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -68,6 +69,45 @@ public static partial class SearchFilter
public static string Create(System.FormattableString filter) { throw null; }
public static string Create(System.FormattableString filter, System.IFormatProvider formatProvider) { throw null; }
}
public static partial class SearchIndexingBufferedSenderExtensions
{
public static void DeleteDocuments(this Azure.Search.Documents.SearchIndexingBufferedSender<Azure.Search.Documents.Models.SearchDocument> indexer, string key, System.Collections.Generic.IEnumerable<string> documentKeys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
public static System.Threading.Tasks.Task DeleteDocumentsAsync(this Azure.Search.Documents.SearchIndexingBufferedSender<Azure.Search.Documents.Models.SearchDocument> indexer, string key, System.Collections.Generic.IEnumerable<string> documentKeys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class SearchIndexingBufferedSenderOptions<T>
{
public SearchIndexingBufferedSenderOptions() { }
public bool AutoFlush { get { throw null; } set { } }
public System.TimeSpan? AutoFlushInterval { get { throw null; } set { } }
public System.Threading.CancellationToken FlushCancellationToken { get { throw null; } set { } }
public System.Func<T, string> KeyFieldAccessor { get { throw null; } set { } }
}
public partial class SearchIndexingBufferedSender<T> : System.IAsyncDisposable, System.IDisposable
{
protected internal SearchIndexingBufferedSender(Azure.Search.Documents.SearchClient searchClient, Azure.Search.Documents.SearchIndexingBufferedSenderOptions<T> options = null) { }
public virtual System.Uri Endpoint { get { throw null; } }
public virtual string IndexName { get { throw null; } }
public virtual string ServiceName { get { throw null; } }
public event System.Func<Azure.Search.Documents.Models.IndexDocumentsAction<T>, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionAddedAsync { add { } remove { } }
public event System.Func<Azure.Search.Documents.Models.IndexDocumentsAction<T>, Azure.Search.Documents.Models.IndexingResult, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionCompletedAsync { add { } remove { } }
public event System.Func<Azure.Search.Documents.Models.IndexDocumentsAction<T>, Azure.Search.Documents.Models.IndexingResult, System.Exception, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionFailedAsync { add { } remove { } }
public event System.Func<Azure.Search.Documents.Models.IndexDocumentsAction<T>, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionSentAsync { add { } remove { } }
public virtual void DeleteDocuments(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
public virtual System.Threading.Tasks.Task DeleteDocumentsAsync(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
~SearchIndexingBufferedSender() { }
public void Flush(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
public System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual void IndexDocuments(Azure.Search.Documents.Models.IndexDocumentsBatch<T> batch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
public virtual System.Threading.Tasks.Task IndexDocumentsAsync(Azure.Search.Documents.Models.IndexDocumentsBatch<T> batch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual void MergeDocuments(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
public virtual System.Threading.Tasks.Task MergeDocumentsAsync(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual void MergeOrUploadDocuments(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
public virtual System.Threading.Tasks.Task MergeOrUploadDocumentsAsync(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
System.Threading.Tasks.ValueTask System.IAsyncDisposable.DisposeAsync() { throw null; }
void System.IDisposable.Dispose() { }
public virtual void UploadDocuments(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { }
public virtual System.Threading.Tasks.Task UploadDocumentsAsync(System.Collections.Generic.IEnumerable<T> documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class SearchOptions
{
public SearchOptions() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@
<Import Project="$(MSBuildThisFileDirectory)..\..\..\core\Azure.Core\src\Azure.Core.props" />
<ItemGroup>
<PackageReference Include="System.Text.Json" />
<PackageReference Include="System.Threading.Channels" />
</ItemGroup>
</Project>
109 changes: 109 additions & 0 deletions sdk/search/Azure.Search.Documents/src/Batching/AsyncEventExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Search.Documents.Batching
{
/// <summary>
/// Extensions to raise <see cref="Func{T, CancellationToken, Task}"/>
/// events that wait for every Task to complete and throw every exception.
/// </summary>
internal static class AsyncEventExtensions
{
/// <summary>
/// Wait for all tasks to be completed and throw every exception.
/// </summary>
/// <param name="tasks">The tasks to execute.</param>
/// <returns>A Task representing completion of all handlers.</returns>
private static async Task JoinAsync(IEnumerable<Task> tasks)
{
if (tasks != null)
{
Task joined = Task.WhenAll(tasks);
try
{
await joined.ConfigureAwait(false);
}
catch (Exception)
{
// awaiting will unwrap the AggregateException which we
// don't want if there were multiple failures that should
// be surfaced
if (joined.Exception?.InnerExceptions?.Count > 1)
{
throw joined.Exception;
}
else
{
throw;
}
}
}
}

/// <summary>
/// Raise the event.
/// </summary>
/// <typeparam name="T">Type of the event argument.</typeparam>
/// <param name="evt">The event to raise.</param>
/// <param name="args">The event arguments.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A Task representing completion of all handlers.</returns>
public static async Task RaiseAsync<T>(
this Func<T, CancellationToken, Task> evt,
T args,
CancellationToken cancellationToken = default) =>
await JoinAsync(
evt?.GetInvocationList()?.Select(
f => (f as Func<T, CancellationToken, Task>)?.Invoke(args, cancellationToken)))
.ConfigureAwait(false);

/// <summary>
/// Raise the event.
/// </summary>
/// <typeparam name="T">Type of the first event argument.</typeparam>
/// <typeparam name="U">Type of the second event argument.</typeparam>
/// <param name="evt">The event to raise.</param>
/// <param name="first">The first event argument.</param>
/// <param name="second">The second event argument.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A Task representing completion of all handlers.</returns>
public static async Task RaiseAsync<T, U>(
this Func<T, U, CancellationToken, Task> evt,
T first,
U second,
CancellationToken cancellationToken = default) =>
await JoinAsync(
evt?.GetInvocationList()?.Select(
f => (f as Func<T, U, CancellationToken, Task>)?.Invoke(first, second, cancellationToken)))
.ConfigureAwait(false);

/// <summary>
/// Raise the event.
/// </summary>
/// <typeparam name="T">Type of the first event argument.</typeparam>
/// <typeparam name="U">Type of the second event argument.</typeparam>
/// <typeparam name="V">Type of the third event argument.</typeparam>
/// <param name="evt">The event to raise.</param>
/// <param name="first">The first event argument.</param>
/// <param name="second">The second event argument.</param>
/// <param name="third">The third event argument.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>A Task representing completion of all handlers.</returns>
public static async Task RaiseAsync<T, U, V>(
this Func<T, U, V, CancellationToken, Task> evt,
T first,
U second,
V third,
CancellationToken cancellationToken = default) =>
await JoinAsync(
evt?.GetInvocationList()?.Select(
f => (f as Func<T, U, V, CancellationToken, Task>)?.Invoke(first, second, third, cancellationToken)))
.ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Search.Documents.Batching
{
/// <summary>
/// InitializationBarrierSlim allows for synchronized lazy initialization. An
/// AsyncLazy wouldn't work in these circumstances because we need the
/// results of the initialization before anyone else can proceed.
/// </summary>
internal class InitializationBarrierSlim : IDisposable
{
/// <summary>
/// Flag indicating whether we've entered the barrier once yet.
/// </summary>
private volatile bool _entered = false;

/// <summary>
/// Synchronize entry to the barrier. The first to arrive is allowed
/// through and everyone else waits for us to finish.
/// </summary>
private SemaphoreSlim _semaphore = new SemaphoreSlim(1);

/// <summary>
/// Try to enter the barrier. Only the first caller will be allowed
/// through and everyone else will wait until it's finished. If this
/// returns <see langword="true"/> then you need to call
/// <see cref="Release"/>. (Do not call <see cref="Release"/> if you
/// did not enter the barrier!)
/// </summary>
/// <param name="async">Whether to invoke sync or async.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>
/// A Task that will either allow you to enter or wait until whoever
/// made it through has already completed.
/// </returns>
public async Task<bool> TryEnterAsync(bool async, CancellationToken cancellationToken)
{
if (_semaphore == null)
{
throw new ObjectDisposedException(nameof(InitializationBarrierSlim));
}

if (_entered)
{
// Short circuit if we've already entered
return false;
}
else if (async)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
else
{
_semaphore.Wait(cancellationToken);
}

// Double check whether we were first or just woke up after waiting
return !_entered;
}

/// <summary>
/// Allow entry into the barrier for anyone waiting and any future
/// callers. This needs to be called whenever TryEnterAsync returned
/// true.
/// </summary>
public void Release()
{
if (_semaphore == null)
{
throw new ObjectDisposedException(nameof(InitializationBarrierSlim));
}

// Unblock anyone else who called this at the same time
_entered = true;
_semaphore.Release();
}

/// <summary>
/// Dispose the barrier.
/// </summary>
public void Dispose()
{
_semaphore?.Dispose();
_semaphore = null;
}
}
}
86 changes: 86 additions & 0 deletions sdk/search/Azure.Search.Documents/src/Batching/ManualRetryDelay.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Search.Documents.Batching
{
/// <summary>
/// Implements a "manual" version of an exponential back off retry policy
/// that allows us to block before sending the next request instead of
/// immediately after we get a response.
/// </summary>
internal class ManualRetryDelay
{
/// <summary>
/// The initial delay we use for calculating back off.
/// </summary>
public TimeSpan Delay { get; set; } = TimeSpan.FromSeconds(0.8);

/// <summary>
/// The maximum delay between attempts.
/// </summary>
public TimeSpan MaxDelay { get; set; } = TimeSpan.FromMinutes(1);

/// <summary>
/// Randomize with jitter.
/// </summary>
private readonly Random _jitter = new Random();

/// <summary>
/// The number of recent attempts that have been throttled.
/// </summary>
private int _throttledAttempts = 0;

/// <summary>
/// When we should wait until before sending our next request.
/// </summary>
private DateTimeOffset? _waitUntil;

/// <summary>
/// Update whether the last request was throttled.
/// </summary>
/// <param name="throttled">
/// Whether the last request was throttled.
/// </param>
public void Update(bool throttled)
{
if (throttled)
{
_throttledAttempts++;

// Use the same logic from Azure.Core's RetryPolicy
TimeSpan delay = TimeSpan.FromMilliseconds(
Math.Min(
(1 << (_throttledAttempts - 1)) * _jitter.Next((int)(Delay.TotalMilliseconds * 0.8), (int)(Delay.TotalMilliseconds * 1.2)),
MaxDelay.TotalMilliseconds));

// Instead of blocking now, let's figure out how long we should
// block until and we can do that before the next request if it
// comes in too soon.
_waitUntil = DateTimeOffset.Now.Add(delay);
}
else
{
_throttledAttempts = 0;
_waitUntil = null;
}
}

/// <summary>
/// Wait until our retry delay has elapsed, if needed.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A Task that will delay if needed.</returns>
public async Task WaitIfNeededAsync(CancellationToken cancellationToken)
{
TimeSpan? wait = _waitUntil - DateTimeOffset.Now;
if (wait >= TimeSpan.Zero)
{
await Task.Delay(wait.Value, cancellationToken).ConfigureAwait(false);
}
}
}
}
Loading

0 comments on commit cb0c1f1

Please sign in to comment.