diff --git a/sdk/search/Azure.Search.Documents/api/Azure.Search.Documents.netstandard2.0.cs b/sdk/search/Azure.Search.Documents/api/Azure.Search.Documents.netstandard2.0.cs index 1c6925d881bd..f98a676dfefd 100644 --- a/sdk/search/Azure.Search.Documents/api/Azure.Search.Documents.netstandard2.0.cs +++ b/sdk/search/Azure.Search.Documents/api/Azure.Search.Documents.netstandard2.0.cs @@ -32,6 +32,7 @@ public SearchClient(System.Uri endpoint, string indexName, Azure.AzureKeyCredent public virtual string ServiceName { get { throw null; } } public virtual Azure.Response Autocomplete(string searchText, string suggesterName, Azure.Search.Documents.AutocompleteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task> AutocompleteAsync(string searchText, string suggesterName, Azure.Search.Documents.AutocompleteOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual Azure.Search.Documents.SearchIndexingBufferedSender CreateIndexingBufferedSender(Azure.Search.Documents.SearchIndexingBufferedSenderOptions options = null) { throw null; } public virtual Azure.Response DeleteDocuments(string keyName, System.Collections.Generic.IEnumerable keyValues, Azure.Search.Documents.IndexDocumentsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task> DeleteDocumentsAsync(string keyName, System.Collections.Generic.IEnumerable keyValues, Azure.Search.Documents.IndexDocumentsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task> DeleteDocumentsAsync(System.Collections.Generic.IEnumerable documents, Azure.Search.Documents.IndexDocumentsOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } @@ -68,6 +69,45 @@ public static partial class SearchFilter public static string Create(System.FormattableString filter) { throw null; } public static string Create(System.FormattableString filter, System.IFormatProvider formatProvider) { throw null; } } + public static partial class SearchIndexingBufferedSenderExtensions + { + public static void DeleteDocuments(this Azure.Search.Documents.SearchIndexingBufferedSender indexer, string key, System.Collections.Generic.IEnumerable documentKeys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } + public static System.Threading.Tasks.Task DeleteDocumentsAsync(this Azure.Search.Documents.SearchIndexingBufferedSender indexer, string key, System.Collections.Generic.IEnumerable documentKeys, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } + public partial class SearchIndexingBufferedSenderOptions + { + public SearchIndexingBufferedSenderOptions() { } + public bool AutoFlush { get { throw null; } set { } } + public System.TimeSpan? AutoFlushInterval { get { throw null; } set { } } + public System.Threading.CancellationToken FlushCancellationToken { get { throw null; } set { } } + public System.Func KeyFieldAccessor { get { throw null; } set { } } + } + public partial class SearchIndexingBufferedSender : System.IAsyncDisposable, System.IDisposable + { + protected internal SearchIndexingBufferedSender(Azure.Search.Documents.SearchClient searchClient, Azure.Search.Documents.SearchIndexingBufferedSenderOptions options = null) { } + public virtual System.Uri Endpoint { get { throw null; } } + public virtual string IndexName { get { throw null; } } + public virtual string ServiceName { get { throw null; } } + public event System.Func, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionAddedAsync { add { } remove { } } + public event System.Func, Azure.Search.Documents.Models.IndexingResult, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionCompletedAsync { add { } remove { } } + public event System.Func, Azure.Search.Documents.Models.IndexingResult, System.Exception, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionFailedAsync { add { } remove { } } + public event System.Func, System.Threading.CancellationToken, System.Threading.Tasks.Task> ActionSentAsync { add { } remove { } } + public virtual void DeleteDocuments(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } + public virtual System.Threading.Tasks.Task DeleteDocumentsAsync(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + ~SearchIndexingBufferedSender() { } + public void Flush(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } + public System.Threading.Tasks.Task FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual void IndexDocuments(Azure.Search.Documents.Models.IndexDocumentsBatch batch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } + public virtual System.Threading.Tasks.Task IndexDocumentsAsync(Azure.Search.Documents.Models.IndexDocumentsBatch batch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual void MergeDocuments(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } + public virtual System.Threading.Tasks.Task MergeDocumentsAsync(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual void MergeOrUploadDocuments(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } + public virtual System.Threading.Tasks.Task MergeOrUploadDocumentsAsync(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + System.Threading.Tasks.ValueTask System.IAsyncDisposable.DisposeAsync() { throw null; } + void System.IDisposable.Dispose() { } + public virtual void UploadDocuments(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { } + public virtual System.Threading.Tasks.Task UploadDocumentsAsync(System.Collections.Generic.IEnumerable documents, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + } public partial class SearchOptions { public SearchOptions() { } diff --git a/sdk/search/Azure.Search.Documents/src/Azure.Search.Documents.csproj b/sdk/search/Azure.Search.Documents/src/Azure.Search.Documents.csproj index 89fc9ccc2e7f..9a09ad0d18f2 100644 --- a/sdk/search/Azure.Search.Documents/src/Azure.Search.Documents.csproj +++ b/sdk/search/Azure.Search.Documents/src/Azure.Search.Documents.csproj @@ -37,5 +37,6 @@ + diff --git a/sdk/search/Azure.Search.Documents/src/Batching/AsyncEventExtensions.cs b/sdk/search/Azure.Search.Documents/src/Batching/AsyncEventExtensions.cs new file mode 100644 index 000000000000..6dedf5d0d77c --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/AsyncEventExtensions.cs @@ -0,0 +1,109 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Search.Documents.Batching +{ + /// + /// Extensions to raise + /// events that wait for every Task to complete and throw every exception. + /// + internal static class AsyncEventExtensions + { + /// + /// Wait for all tasks to be completed and throw every exception. + /// + /// The tasks to execute. + /// A Task representing completion of all handlers. + private static async Task JoinAsync(IEnumerable tasks) + { + if (tasks != null) + { + Task joined = Task.WhenAll(tasks); + try + { + await joined.ConfigureAwait(false); + } + catch (Exception) + { + // awaiting will unwrap the AggregateException which we + // don't want if there were multiple failures that should + // be surfaced + if (joined.Exception?.InnerExceptions?.Count > 1) + { + throw joined.Exception; + } + else + { + throw; + } + } + } + } + + /// + /// Raise the event. + /// + /// Type of the event argument. + /// The event to raise. + /// The event arguments. + /// A cancellation token. + /// A Task representing completion of all handlers. + public static async Task RaiseAsync( + this Func evt, + T args, + CancellationToken cancellationToken = default) => + await JoinAsync( + evt?.GetInvocationList()?.Select( + f => (f as Func)?.Invoke(args, cancellationToken))) + .ConfigureAwait(false); + + /// + /// Raise the event. + /// + /// Type of the first event argument. + /// Type of the second event argument. + /// The event to raise. + /// The first event argument. + /// The second event argument. + /// A cancellation token. + /// A Task representing completion of all handlers. + public static async Task RaiseAsync( + this Func evt, + T first, + U second, + CancellationToken cancellationToken = default) => + await JoinAsync( + evt?.GetInvocationList()?.Select( + f => (f as Func)?.Invoke(first, second, cancellationToken))) + .ConfigureAwait(false); + + /// + /// Raise the event. + /// + /// Type of the first event argument. + /// Type of the second event argument. + /// Type of the third event argument. + /// The event to raise. + /// The first event argument. + /// The second event argument. + /// The third event argument. + /// A cancellation token. + /// A Task representing completion of all handlers. + public static async Task RaiseAsync( + this Func evt, + T first, + U second, + V third, + CancellationToken cancellationToken = default) => + await JoinAsync( + evt?.GetInvocationList()?.Select( + f => (f as Func)?.Invoke(first, second, third, cancellationToken))) + .ConfigureAwait(false); + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/InitializationBarrierSlim.cs b/sdk/search/Azure.Search.Documents/src/Batching/InitializationBarrierSlim.cs new file mode 100644 index 000000000000..83572a7020f3 --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/InitializationBarrierSlim.cs @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Search.Documents.Batching +{ + /// + /// InitializationBarrierSlim allows for synchronized lazy initialization. An + /// AsyncLazy wouldn't work in these circumstances because we need the + /// results of the initialization before anyone else can proceed. + /// + internal class InitializationBarrierSlim : IDisposable + { + /// + /// Flag indicating whether we've entered the barrier once yet. + /// + private volatile bool _entered = false; + + /// + /// Synchronize entry to the barrier. The first to arrive is allowed + /// through and everyone else waits for us to finish. + /// + private SemaphoreSlim _semaphore = new SemaphoreSlim(1); + + /// + /// Try to enter the barrier. Only the first caller will be allowed + /// through and everyone else will wait until it's finished. If this + /// returns then you need to call + /// . (Do not call if you + /// did not enter the barrier!) + /// + /// Whether to invoke sync or async. + /// Cancellation token. + /// + /// A Task that will either allow you to enter or wait until whoever + /// made it through has already completed. + /// + public async Task TryEnterAsync(bool async, CancellationToken cancellationToken) + { + if (_semaphore == null) + { + throw new ObjectDisposedException(nameof(InitializationBarrierSlim)); + } + + if (_entered) + { + // Short circuit if we've already entered + return false; + } + else if (async) + { + await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + _semaphore.Wait(cancellationToken); + } + + // Double check whether we were first or just woke up after waiting + return !_entered; + } + + /// + /// Allow entry into the barrier for anyone waiting and any future + /// callers. This needs to be called whenever TryEnterAsync returned + /// true. + /// + public void Release() + { + if (_semaphore == null) + { + throw new ObjectDisposedException(nameof(InitializationBarrierSlim)); + } + + // Unblock anyone else who called this at the same time + _entered = true; + _semaphore.Release(); + } + + /// + /// Dispose the barrier. + /// + public void Dispose() + { + _semaphore?.Dispose(); + _semaphore = null; + } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/ManualRetryDelay.cs b/sdk/search/Azure.Search.Documents/src/Batching/ManualRetryDelay.cs new file mode 100644 index 000000000000..16f9cf331333 --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/ManualRetryDelay.cs @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Search.Documents.Batching +{ + /// + /// Implements a "manual" version of an exponential back off retry policy + /// that allows us to block before sending the next request instead of + /// immediately after we get a response. + /// + internal class ManualRetryDelay + { + /// + /// The initial delay we use for calculating back off. + /// + public TimeSpan Delay { get; set; } = TimeSpan.FromSeconds(0.8); + + /// + /// The maximum delay between attempts. + /// + public TimeSpan MaxDelay { get; set; } = TimeSpan.FromMinutes(1); + + /// + /// Randomize with jitter. + /// + private readonly Random _jitter = new Random(); + + /// + /// The number of recent attempts that have been throttled. + /// + private int _throttledAttempts = 0; + + /// + /// When we should wait until before sending our next request. + /// + private DateTimeOffset? _waitUntil; + + /// + /// Update whether the last request was throttled. + /// + /// + /// Whether the last request was throttled. + /// + public void Update(bool throttled) + { + if (throttled) + { + _throttledAttempts++; + + // Use the same logic from Azure.Core's RetryPolicy + TimeSpan delay = TimeSpan.FromMilliseconds( + Math.Min( + (1 << (_throttledAttempts - 1)) * _jitter.Next((int)(Delay.TotalMilliseconds * 0.8), (int)(Delay.TotalMilliseconds * 1.2)), + MaxDelay.TotalMilliseconds)); + + // Instead of blocking now, let's figure out how long we should + // block until and we can do that before the next request if it + // comes in too soon. + _waitUntil = DateTimeOffset.Now.Add(delay); + } + else + { + _throttledAttempts = 0; + _waitUntil = null; + } + } + + /// + /// Wait until our retry delay has elapsed, if needed. + /// + /// The cancellation token. + /// A Task that will delay if needed. + public async Task WaitIfNeededAsync(CancellationToken cancellationToken) + { + TimeSpan? wait = _waitUntil - DateTimeOffset.Now; + if (wait >= TimeSpan.Zero) + { + await Task.Delay(wait.Value, cancellationToken).ConfigureAwait(false); + } + } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/ProducerBarrierSlim.cs b/sdk/search/Azure.Search.Documents/src/Batching/ProducerBarrierSlim.cs new file mode 100644 index 000000000000..5b294bde300b --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/ProducerBarrierSlim.cs @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace Azure.Search.Documents.Batching +{ + /// + /// While most writers of a Channel only want to push work and return, + /// occasionally we'll have writers like Flush that want to block until + /// the reader does something with that work. ProducerBarrierSlim allows + /// them to wait and be released after the work has completed. + /// + internal class ProducerBarrierSlim : IDisposable + { + /// + /// Semaphore to enforce the barrier. + /// + private SemaphoreSlim _semaphore = new SemaphoreSlim(0); + + /// + /// The number of tasks waiting on the submission semaphore. This + /// determines how many times we release the semaphore. + /// + internal volatile int _waiters = 0; + + /// + /// Wait for the barrier to be lifted. + /// + /// Whether to call sync or async. + /// Cancellation token. + /// + /// A Task that will wait until the barrier is lifted. + /// + public async Task WaitAsync(bool async, CancellationToken cancellationToken) + { + if (_semaphore == null) + { + throw new ObjectDisposedException(nameof(ProducerBarrierSlim)); + } + + Interlocked.Increment(ref _waiters); + try + { + if (async) + { + await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + _semaphore.Wait(cancellationToken); + } + } + finally + { + Interlocked.Decrement(ref _waiters); + } + } + + /// + /// Lift the barrier. + /// + public void Release() + { + if (_semaphore == null) + { + throw new ObjectDisposedException(nameof(ProducerBarrierSlim)); + } + + // Wake up anyone who was blocked on a flush finishing + int waiting; + while ((waiting = _waiters) > 0) + { + _semaphore.Release(waiting); + } + } + + /// + /// Dispose the barrier. + /// + public void Dispose() + { + Debug.Assert( + _waiters == 0, + $"{nameof(ProducerBarrierSlim)} disposed with tasks still waiting! Somebody didn't call {nameof(Release)}."); + + _semaphore?.Dispose(); + _semaphore = null; + } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/PublisherAction{T}.cs b/sdk/search/Azure.Search.Documents/src/Batching/PublisherAction{T}.cs new file mode 100644 index 000000000000..8155d770cd89 --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/PublisherAction{T}.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Diagnostics; +using Azure.Search.Documents.Models; + +namespace Azure.Search.Documents.Batching +{ + /// + /// Tracks actions that the publisher hasn't submitted yet. + /// + /// + /// The .NET type that maps to the index schema. Instances of this + /// type can be retrieved as documents from the index. + /// + internal class PublisherAction + { + /// + /// Gets the action to submit. + /// + public T Document { get; } + + /// + /// The key used to identify the document. + /// + public string Key { get; } + + /// + /// Gets the number of retry attempts. + /// + public int RetryAttempts { get; set; } + + // TODO: Add the serialized payload in an ArrayPool buffer + + /// + /// Creates a new PublisherAction. + /// + /// The action to submit. + /// Key of the action's document. + public PublisherAction(T document, string key) + { + Debug.Assert(document != null); + Debug.Assert(key != null); + Document = document; + Key = key; + } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.Message.cs b/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.Message.cs new file mode 100644 index 000000000000..17788b1158bd --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.Message.cs @@ -0,0 +1,46 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; + +namespace Azure.Search.Documents.Batching +{ + internal partial class Publisher + { + /// + /// A publishing message passed from a writer to a reader in a Channel. + /// + private class Message + { + /// + /// Gets the type of operation. + /// + public MessageOperation Operation { get; private set; } + + /// + /// Gets any documents to be published. Could be null depending on + /// the operation. + /// + public IEnumerable Documents { get; private set; } + + /// + /// Create a message to publish documents. + /// + /// The documents to publish. + /// A message to publish documents. + public static Message Publish(IEnumerable documents) => + new Message + { + Operation = MessageOperation.Publish, + Documents = documents + }; + + /// + /// Create a message to flush the publisher. + /// + /// A message to flush the publisher. + public static Message Flush() => + new Message { Operation = MessageOperation.Flush }; + } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.MessageOperation.cs b/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.MessageOperation.cs new file mode 100644 index 000000000000..47e5b966071b --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.MessageOperation.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Search.Documents.Batching +{ + internal partial class Publisher + { + /// + /// Types of operations that can be sent from writers to a reader in a + /// publisher. + /// + private enum MessageOperation + { + Publish, + Flush + } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.cs b/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.cs new file mode 100644 index 000000000000..315abe19d7b3 --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/Publisher{T}.cs @@ -0,0 +1,493 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Azure.Core.Pipeline; + +namespace Azure.Search.Documents.Batching +{ + /// + /// Implement a Producer/Consumer pattern where we (potentially) have + /// multiple document producers and a single consumer that submits those + /// events to a service. This class is mostly focused on synchronizing + /// behavior between document producers and relies on abstract methods to + /// implement service specific semantics. + /// + /// The type of document being published. + internal abstract partial class Publisher : IDisposable, IAsyncDisposable + { + /// + /// Flag indicating whether the publisher has been disposed. + /// + private volatile int _disposed = 0; + + /// + /// Channel used to communicate between the sender and publisher. + /// + private Channel _channel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true }); + + /// + /// "Blocking" semaphore used to wait for flushing to complete. + /// + private ProducerBarrierSlim _flushBarrier = new ProducerBarrierSlim(); + + /// + /// Task that's acting as our channel's event loop to read publishing + /// actions. + /// + private Task _readerLoop; + + /// + /// A timer that we can use to automatically flush after a period of + /// inactivity. + /// + private Timer _timer; + + /// + /// Queue of pending actions. + /// + private Queue> _pending = new Queue>(); + + /// + /// Queue of actions that need to be retried. + /// + private Queue> _retry = new Queue>(); + + /// + /// Manual retry policy to add exponential back-off after throttled + /// requests. + /// + private ManualRetryDelay _manualRetries = new ManualRetryDelay(); + + /// + /// Gets the number of indexing actions currently awaiting submission. + /// + public int IndexingActionsCount + { + get => _pending.Count + _retry.Count; + } + + /// + /// Gets a value indicating whether the publisher should automatically + /// flush any indexing actions that have been added. This will happen + /// when the batch is full or when the + /// has elapsed. + /// + public bool AutoFlush { get; } + + /// + /// Gets an optional amount of time to wait before automatically + /// flushing any remaining indexing actions. + /// + public TimeSpan? AutoFlushInterval { get; } + + /// + /// Gets a to use when publishing. + /// + protected CancellationToken PublisherCancellationToken { get; } + + /// + /// Gets a value indicating the number of actions to group into a batch + /// when tuning the behavior of the publisher. + /// + protected int BatchActionSize { get; } // Note: Not automatically tuning yet + + /// + /// Gets a value indicating the number of bytes to use when tuning the + /// behavior of the publisher. + /// + protected int BatchPayloadSize { get; } // Note: Not used yet + + /// + /// Gets the number of times to retry a failed document. + /// + protected int RetryCount { get; } // Note: Not configurable yet + + /// + /// Creates a new Publisher which immediately starts listening to + /// process requests. + /// + /// + /// A value indicating whether the publisher should automatically flush. + /// + /// + /// An optional amount of time to wait before automatically flushing. + /// + /// + /// The number of actions to group into a batch. + /// + /// + /// The number of bytes to use when tuning the behavior of the + /// publisher. + /// + /// + /// The number of times to retry a failed document. + /// + /// + /// A to use when publishing. + /// + public Publisher( + bool autoFlush, + TimeSpan? autoFlushInterval, + int? batchActionSize, + int? batchPayloadSize, + int? retryCount, + CancellationToken publisherCancellationToken) + { + AutoFlush = autoFlush; + AutoFlushInterval = autoFlushInterval; + PublisherCancellationToken = publisherCancellationToken; + BatchActionSize = batchActionSize ?? SearchIndexingBufferedSenderOptions.DefaultBatchActionSize; + BatchPayloadSize = batchPayloadSize ?? SearchIndexingBufferedSenderOptions.DefaultBatchPayloadSize; + RetryCount = retryCount ?? SearchIndexingBufferedSenderOptions.DefaultRetryCount; + + // Start the message loop + _readerLoop = Task.Run(ProcessMessagesAsync, publisherCancellationToken); + } + + #region Message Producers - only called externally and do not touch state + /// + /// Add documents to be published. This should only be called outside + /// of the Publisher itself. + /// + /// The documents to publish. + /// Whether to call sync or async. + /// Cancellation token. + /// Task for adding the documents. + public async Task AddDocumentsAsync(IEnumerable documents, bool async, CancellationToken cancellationToken = default) + { + EnsureNotDisposed(); + await SendMessageInternal(Message.Publish(documents), async, cancellationToken).ConfigureAwait(false); + } + + /// + /// Flush any pending documents. This should only be called outside of + /// the Publisher itself. + /// + /// Whether to call sync or async. + /// Cancellation token. + /// Task for adding the documents. + public async Task FlushAsync(bool async, CancellationToken cancellationToken = default) + { + await SendMessageInternal(Message.Flush(), async, cancellationToken).ConfigureAwait(false); + await _flushBarrier.WaitAsync(async, cancellationToken).ConfigureAwait(false); + } + + /// + /// Send a message from a producer to the channel. + /// + /// The message to send. + /// Whether to execute sync or async. + /// Cancellation token. + /// Task for the sending the message. + private async Task SendMessageInternal( + Message message, + bool async, + CancellationToken cancellationToken = default) + { + // Send the message + if (async) + { + await _channel.Writer.WriteAsync(message, cancellationToken).ConfigureAwait(false); + } + else + { + // It's an unbounded channel so spinning should be fine + while (!_channel.Writer.TryWrite(message)) { /* spin */ } + } + } + #endregion + + #region Message Consumer + /// + /// Listen for and process messages. + /// + /// + /// A Task that will run for the lifetime of the publisher. + /// + private async Task ProcessMessagesAsync() + { + CancellationToken cancellationToken = PublisherCancellationToken; + while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + Message message = await _channel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); + switch (message.Operation) + { + case MessageOperation.Publish: + await OnDocumentsAddedAsync(message.Documents, cancellationToken).ConfigureAwait(false); + break; + case MessageOperation.Flush: + await OnFlushedAsync(cancellationToken).ConfigureAwait(false); + break; + default: + throw new InvalidOperationException($"Unexpected value {message.Operation} of type {nameof(MessageOperation)}."); + } + } + StopTimer(); + } + + /// + /// Get a key used to identify a given document. + /// + /// The document. + /// A key for that document. + protected abstract string GetDocumentKey(T document); + + /// + /// Process any documents that have been added for publishing. + /// + /// The documents to publish. + /// Cancellation token. + /// Task for processing the added documents. + protected virtual async Task OnDocumentsAddedAsync(IEnumerable documents, CancellationToken cancellationToken) + { + // Add all of the documents to our queue + Debug.Assert(documents != null); + foreach (T document in documents) + { + PublisherAction action = new PublisherAction(document, GetDocumentKey(document)); + _pending.Enqueue(action); + } + + // Determine whether or not that crossed the threshold and we need + // to automatically submit the next batch + if (AutoFlush && HasFullBatch()) + { + await PublishAsync(cancellationToken).ConfigureAwait(false); + } + + // If there's anything left unsent, start a timer to call to flush + // after the it elapses. + if (IndexingActionsCount > 0) + { + StartTimer(); + } + } + + /// + /// Process any requests to flush. After submitting, this will unblock + /// anyone waiting behind the Flush barrier. + /// + /// Cancellation token. + /// Task for flushing. + protected virtual async Task OnFlushedAsync(CancellationToken cancellationToken) + { + await PublishAsync(cancellationToken).ConfigureAwait(false); + + // Wake up anyone who was blocked on a flush finishing + _flushBarrier.Release(); + } + #endregion Message Consumer + + #region Dispose + /// + /// Clean up any resources. + /// + async ValueTask IAsyncDisposable.DisposeAsync() => + await DisposeAsync(async: true).ConfigureAwait(false); + + /// + /// Clean up any resources. + /// + void IDisposable.Dispose() => + DisposeAsync(async: false).EnsureCompleted(); + + /// + /// Dispose the sender and flush any remaining indexing actions that + /// haven't been sent yet. This will block until everything's been + /// sent. + /// + /// Whether to call this sync or async. + /// A Task that will wait until we're disposed. + public async Task DisposeAsync(bool async) + { + if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) + { + StopTimer(); + await FlushAsync(async, PublisherCancellationToken).ConfigureAwait(false); + _channel?.Writer.TryComplete(); + if (async) + { + await _readerLoop.ConfigureAwait(false); + } + else + { + #pragma warning disable AZC0104 // Use EnsureCompleted() directly on asynchronous method return value. + _readerLoop.EnsureCompleted(); + #pragma warning restore AZC0104 // Use EnsureCompleted() directly on asynchronous method return value. + } + _flushBarrier.Dispose(); + } + } + + /// + /// Ensure nobody's trying to use this after it's been disposed. + /// + private void EnsureNotDisposed() + { + if (_disposed > 0) + { + throw new ObjectDisposedException(nameof(SearchIndexingBufferedSender)); + } + } + #endregion Dispose + + #region Publishing + /// + /// Check if we have a full batch ready to send. + /// + /// If we have a full batch ready to send. + private bool HasFullBatch() => IndexingActionsCount >= BatchActionSize; + + /// + /// Publish as many batches are ready. + /// + /// A cancellation token. + /// A task that represents the operation. + private async Task PublishAsync(CancellationToken cancellationToken) + { + // There's no need to let the timer keep running since we're + // already submitting + StopTimer(); + + do + { + // Prefer pulling from the _retry queue first + List> batch = new List>(); + _ = FillBatchFromQueue(batch, _retry) && FillBatchFromQueue(batch, _pending); + + // Submit the batch + await SubmitBatchAsync( + batch, + cancellationToken) + .ConfigureAwait(false); + + // Keep going if we have more full batches ready to submit + } while (HasFullBatch()); + + // Fill as much of the batch as possible from the given queue. + // Returns whether the batch is now full. + bool FillBatchFromQueue(List> batch, Queue< PublisherAction> queue) + { + // TODO: Consider tracking the keys in the batch and requiring them + // to be unique to avoid error alignment problems + + while (queue.Count > 0) + { + if (batch.Count < BatchActionSize) + { + batch.Add(queue.Dequeue()); + } + else + { + return false; + } + } + return true; + } + } + + /// + /// Send indexing actions to be processed by the service. + /// + /// The batch of actions to submit. + /// A cancellation token. + /// Whether the submission was throttled. + protected async Task SubmitBatchAsync(IList> batch, CancellationToken cancellationToken) + { + // If Submit is called before our last retry delay elapsed, we'll + // wait before sending the next request + await _manualRetries.WaitIfNeededAsync(cancellationToken).ConfigureAwait(false); + + // Send the request + bool throttled = await OnSubmitBatchAsync(batch, cancellationToken).ConfigureAwait(false); + + // Update whether or not the request was throttled to update our + // retry delay + _manualRetries.Update(throttled); + } + + /// + /// Send indexing actions to be processed by the service. + /// + /// The batch of actions to submit. + /// A cancellation token. + /// Whether the submission was throttled. + protected abstract Task OnSubmitBatchAsync(IList> batch, CancellationToken cancellationToken); + + /// + /// Enqueue an action to be retried. + /// + /// The action to be retried. + /// + /// Whether we should skip incrementing the retry attempts because the + /// failure wasn't related to the action. + /// + /// + /// A value indicating whether the action is retriable. + /// + protected bool EnqueueRetry( + PublisherAction action, + bool skipIncrement = false) + { + bool retriable = skipIncrement || action.RetryAttempts++ < RetryCount; + if (retriable) + { + _retry.Enqueue(action); + } + return retriable; + } + #endregion Publishing + + #region Timer + /// + /// Start the timer if it's not already running. + /// + private void StartTimer() + { + if (AutoFlushInterval != null) + { + int intervalInMs = (int)AutoFlushInterval.Value.TotalMilliseconds; + if (_timer == null) + { + _timer = new Timer( + OnTimerElapsed, + state: null, + dueTime: intervalInMs, + period: Timeout.Infinite); + } + } + } + + /// + /// Stop the timer. + /// + private void StopTimer() + { + if (_timer != null) + { + _timer.Dispose(); + _timer = null; + } + } + + /// + /// Trigger a flush when the timer elapses. + /// + /// + private void OnTimerElapsed(object timerState) + { + // We'll do this synchronously since it's an unbounded channel and + // we can just spin if we need to. + Message message = Message.Flush(); + while (!_channel.Writer.TryWrite(message)) { /* spin */ } + } + #endregion Timer + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSenderExtensions.cs b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSenderExtensions.cs new file mode 100644 index 000000000000..8a61c17629cc --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSenderExtensions.cs @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Core; +using Azure.Core.Pipeline; +using Azure.Search.Documents.Models; + +namespace Azure.Search.Documents +{ + /// + /// Extension methods for . + /// + public static class SearchIndexingBufferedSenderExtensions + { + /// + /// Adds a batch of delete actions to eventually send to the search + /// index. + /// + /// The sender. + /// The name of the key field. + /// + /// The keys of the documents to delete. + /// + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + public static void DeleteDocuments( + this SearchIndexingBufferedSender indexer, + string key, + IEnumerable documentKeys, + CancellationToken cancellationToken = default) => + DeleteDocumentsInternal( + indexer, + key, + documentKeys, + async: false, + cancellationToken) + .EnsureCompleted(); + + /// + /// Adds a batch of delete actions to eventually send to the search + /// index. + /// + /// The sender. + /// The name of the key field. + /// + /// The keys of the documents to delete. + /// + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + /// + /// A task that completes when the indexing actions have been added but + /// not yet sent. + /// + public static async Task DeleteDocumentsAsync( + this SearchIndexingBufferedSender indexer, + string key, + IEnumerable documentKeys, + CancellationToken cancellationToken = default) => + await DeleteDocumentsInternal( + indexer, + key, + documentKeys, + async: true, + cancellationToken) + .ConfigureAwait(false); + + private static async Task DeleteDocumentsInternal( + SearchIndexingBufferedSender indexer, + string key, + IEnumerable documentKeys, + bool async, + CancellationToken cancellationToken = default) + { + Argument.AssertNotNull(indexer, nameof(indexer)); + Argument.AssertNotNull(key, nameof(key)); + Argument.AssertNotNull(documentKeys, nameof(documentKeys)); + + var batch = IndexDocumentsBatch.Delete( + documentKeys.Select(k => new SearchDocument { [key] = k })); + if (async) + { + await indexer.IndexDocumentsAsync(batch, cancellationToken).ConfigureAwait(false); + } + else + { + indexer.IndexDocuments(batch, cancellationToken); + } + } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSenderOptions{T}.cs b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSenderOptions{T}.cs new file mode 100644 index 000000000000..31b684e2cbec --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSenderOptions{T}.cs @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; +using Azure.Search.Documents.Indexes; + +namespace Azure.Search.Documents +{ + /// + /// Provides the configuration options for + /// . + /// + /// + /// The .NET type that maps to the index schema. Instances of this type + /// can be retrieved as documents from the index. + /// + public class SearchIndexingBufferedSenderOptions + { + /// + /// Gets or sets a value indicating whether the sender should + /// automatically flush any indexing actions that have been added. + /// This will happen when the batch is full or when the + /// has elapsed. The default value is + /// . + /// + public bool AutoFlush { get; set; } = true; + + /// + /// Gets or sets an optional amount of time to wait before + /// automatically flushing any remaining indexing actions. The default + /// value is 60 seconds. + /// + public TimeSpan? AutoFlushInterval + { + get => _autoFlushInterval; + set + { + // Treat any non-positive values as null. + _autoFlushInterval = + value.HasValue && value.Value <= TimeSpan.Zero ? + null : + value; + } + } + private TimeSpan? _autoFlushInterval = TimeSpan.FromSeconds(60); + + /// + /// Gets or sets a to use when + /// submitting indexing actions. + /// + public CancellationToken FlushCancellationToken { get; set; } + + /// + /// Gets or sets a value indicating the number of actions to group into + /// a batch when tuning the behavior of the sender. The default value + /// is 512. The current service maximum is 32000. + /// + internal int? BatchActionSize { get; set; } = DefaultBatchActionSize; + internal const int DefaultBatchActionSize = 512; + + /// + /// Gets or sets a value indicating the number of bytes to use when + /// tuning the behavior of the sender. The default value is 512. The + /// current service maximum is 32000. + /// + internal int? BatchPayloadSize { get; set; } = DefaultBatchPayloadSize; + internal const int DefaultBatchPayloadSize = 500 * 1024; + + /// + /// Gets or sets the number of times to retry a failed document. + /// + internal int? RetryCount { get; set; } = DefaultRetryCount; + internal const int DefaultRetryCount = 3; + + /// + /// Gets or sets a function that can be used to access the index key + /// value of a document. Any indexing errors are identified by key and + /// you can use this function to provide that mapping. Otherwise we + /// will look for or call + /// + /// to help automatically determine the key. + /// + public Func KeyFieldAccessor { get; set; } + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSender{T}.cs b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSender{T}.cs new file mode 100644 index 000000000000..f3f39e7a83a4 --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingBufferedSender{T}.cs @@ -0,0 +1,606 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Azure.Core; +using Azure.Core.Pipeline; +using Azure.Search.Documents.Batching; +using Azure.Search.Documents.Indexes; +using Azure.Search.Documents.Indexes.Models; +using Azure.Search.Documents.Models; + +namespace Azure.Search.Documents +{ + /// + /// Index search documents with intelligent batching, automatic flushing, + /// and retries for failed indexing actions. + /// + /// + /// The .NET type that maps to the index schema. Instances of this + /// type can be retrieved as documents from the index. You can use + /// for dynamic documents. + /// + public class SearchIndexingBufferedSender : IDisposable, IAsyncDisposable + { + /// + /// Flag indicating whether the sender has been disposed. + /// + private volatile int _disposed = 0; + + /// + /// The single publisher responsible for submitting requests. + /// + private SearchIndexingPublisher _publisher; + + /// + /// Gets the used to send requests to the + /// service. + /// + internal virtual SearchClient SearchClient { get; } + + /// + /// Gets the URI endpoint of the Search Service. This is likely + /// to be similar to "https://{search_service}.search.windows.net". + /// + /// + /// This is not the URI of the Search Index. You could construct that + /// URI with "{Endpoint}/indexes/{IndexName}" if needed. + /// + public virtual Uri Endpoint => SearchClient.Endpoint; + + /// + /// Gets the name of the Search Service. + /// + public virtual string ServiceName => SearchClient.ServiceName; + + /// + /// Gets the name of the Search Index. + /// + public virtual string IndexName => SearchClient.IndexName; + + /// + /// Gets a function that can be used to access the index key value of a + /// document. + /// + internal virtual Func KeyFieldAccessor { get; private set; } + + /// + /// Ensure we only lookup the key field accessor once, but that + /// everyone waits until we have one. + /// + private InitializationBarrierSlim _keyFieldBarrier = new InitializationBarrierSlim(); + + /// + /// Async event raised whenever an indexing action is added to the + /// sender. + /// + public event Func, CancellationToken, Task> ActionAddedAsync; + + /// + /// Async event raised whenever an indexing action is sent by the + /// sender. + /// + public event Func, CancellationToken, Task> ActionSentAsync; + + /// + /// Async event raised whenever an indexing action was submitted + /// successfully. + /// + public event Func, IndexingResult, CancellationToken, Task> ActionCompletedAsync; + + /// + /// Async event raised whenever an indexing action failed. The + /// or may be null + /// depending on the failure. + /// + public event Func, IndexingResult, Exception, CancellationToken, Task> ActionFailedAsync; + + /// + /// Creates a new instance of the SearchIndexingBufferedSender. + /// + /// + /// The SearchClient used to send requests to the service. + /// + /// + /// Provides the configuration options for the sender. + /// + protected internal SearchIndexingBufferedSender( + SearchClient searchClient, + SearchIndexingBufferedSenderOptions options = null) + { + Argument.AssertNotNull(searchClient, nameof(searchClient)); + SearchClient = searchClient; + + options ??= new SearchIndexingBufferedSenderOptions(); + KeyFieldAccessor = options.KeyFieldAccessor; + _publisher = new SearchIndexingPublisher( + this, + options.AutoFlush, + options.AutoFlushInterval, + options.BatchActionSize, + options.BatchPayloadSize, + options.RetryCount, + options.FlushCancellationToken); + } + + #region Dispose + /// + /// Flush any remaining work and clean up resources. + /// + #pragma warning disable CA1816 // Dispose methods should call SuppressFinalize + void IDisposable.Dispose() => + DisposeAsync(async: false).EnsureCompleted(); + #pragma warning restore CA1816 // Dispose methods should call SuppressFinalize + + /// + /// Flush any remaining work and clean up resources. + /// + /// + /// A task that will complete when the object has finished disposing. + /// + async ValueTask IAsyncDisposable.DisposeAsync() => + await DisposeAsync(async: true).ConfigureAwait(false); + + /// + /// Dispose the sender and flush any remaining indexing actions that + /// haven't been sent yet. This will block until everything's been + /// sent. + /// + /// Whether to call this sync or async. + /// A Task that will wait until we're disposed. + internal async Task DisposeAsync(bool async) + { + if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) + { + await _publisher.DisposeAsync(async).ConfigureAwait(false); + _keyFieldBarrier.Dispose(); + } + } + + /// + /// Ensure the sender was properly disposed. + /// + ~SearchIndexingBufferedSender() + { + if (_publisher?.IndexingActionsCount > 0) + { + try + { + #pragma warning disable CA1065 // Do not raise exceptions in unexpected locations + throw new InvalidOperationException( + $"{nameof(SearchIndexingBufferedSender)} has {_publisher.IndexingActionsCount} unsent indexing actions."); + #pragma warning restore CA1065 // Do not raise exceptions in unexpected locations + } + catch (InvalidOperationException) + { + } + } + } + + /// + /// Ensure nobody's trying to use this after it's been disposed. + /// + private void EnsureNotDisposed() + { + if (_disposed > 0) + { + throw new ObjectDisposedException(nameof(SearchIndexingBufferedSender)); + } + } + #endregion Dispose + + #region Get Key Field + /// + /// Ensure we have a valid KeyFieldAccessor (and probe the type or + /// service to find one). + /// + /// Whether to run sync or async. + /// The Cancellation token. + /// A task that will complete after we've checked. + private async Task EnsureKeyFieldAccessorAsync(bool async, CancellationToken cancellationToken) + { + // Skip initialization if we already have one + if (KeyFieldAccessor != null) { return; } + + // Make sure we only run the initialization code once + if (await _keyFieldBarrier.TryEnterAsync(async, cancellationToken).ConfigureAwait(false)) + { + // Release no matter what + try + { + // Case 1: The user provided an explicit accessor and we're done + if (KeyFieldAccessor != null) { return; } + + // Case 2: Infer the accessor from FieldBuilder + try + { + FieldBuilder builder = new FieldBuilder { Serializer = SearchClient.Serializer }; + IDictionary fields = builder.BuildMapping(typeof(T)); + KeyValuePair keyField = fields.First(pair => pair.Value.IsKey == true); + KeyFieldAccessor = CompileAccessor(keyField.Key); + return; + } + catch + { + // Ignore any errors because this type might not have been + // designed with FieldBuilder in mind + } + + // Case 3: Fetch the index to find the key + Exception failure = null; + try + { + // Call the service to find the name of the key + SearchIndexClient indexClient = SearchClient.GetSearchIndexClient(); + SearchIndex index = async ? + await indexClient.GetIndexAsync(IndexName, cancellationToken).ConfigureAwait(false) : + indexClient.GetIndex(IndexName, cancellationToken); + SearchField keyField = index.Fields.Single(f => f.IsKey == true); + string key = keyField.Name; + + if (typeof(T).IsAssignableFrom(typeof(SearchDocument))) + { + // Case 3a: If it's a dynamic SearchDocument, lookup + // the name of the key in the dictionary + KeyFieldAccessor = (T doc) => (doc as SearchDocument)?.GetString(key); + return; + } + else + { + // Case 3b: We'll see if there's a property with the + // same name and use that as the accessor + if (typeof(T).GetProperty(key) != null || + typeof(T).GetField(key) != null) + { + KeyFieldAccessor = CompileAccessor(key); + return; + } + } + } + catch (Exception ex) + { + // We'll provide any exceptions as a hint because it could + // be something like using the wrong API Key type when + // moving from SearchClient up to SearchIndexClient that + // potentially could be addressed if the user really wanted + failure = ex; + } + + // Case 4: Throw and tell the user to provide an explicit accessor. + throw new InvalidOperationException( + $"Failed to discover the Key field of document type {typeof(T).Name} for Azure Cognitive Search index {IndexName}. " + + $"Please set {typeof(SearchIndexingBufferedSenderOptions).Name}.{nameof(SearchIndexingBufferedSenderOptions.KeyFieldAccessor)} explicitly.", + failure); + } + finally + { + // Unblock anyone else who called this at the same time + _keyFieldBarrier.Release(); + } + } + + // Build an accessor for a property named key on type T. Compiling + // is kind of heavyweight, but we'll be calling this so many times + // it's worth it. + static Func CompileAccessor(string key) + { + ParameterExpression param = Expression.Parameter(typeof(T), "doc"); + Expression> accessor = Expression.Lambda>( + Expression.PropertyOrField(param, key), + param); + return accessor.Compile(); + } + } + #endregion Get Key Field + + #region Notifications + /// + /// Raise the ActionAddedAsync event. + /// + /// The action being added. + /// Cancellation token. + /// + /// A task that will not complete until every handler does. + /// + internal async Task RaiseActionAddedAsync( + IndexDocumentsAction action, + CancellationToken cancellationToken) => + await ActionAddedAsync + .RaiseAsync(action, cancellationToken) + .ConfigureAwait(false); + + /// + /// Raise the ActionSentAsync event. + /// + /// The action being added. + /// Cancellation token. + /// + /// A task that will not complete until every handler does. + /// + internal async Task RaiseActionSentAsync( + IndexDocumentsAction action, + CancellationToken cancellationToken) => + await ActionSentAsync + .RaiseAsync(action, cancellationToken) + .ConfigureAwait(false); + + /// + /// Raise the ActionCompletedAsync event. + /// + /// The action being added. + /// The result of indexing. + /// Cancellation token. + /// + /// A task that will not complete until every handler does. + /// + internal async Task RaiseActionCompletedAsync( + IndexDocumentsAction action, + IndexingResult result, + CancellationToken cancellationToken) => + await ActionCompletedAsync + .RaiseAsync(action, result, cancellationToken) + .ConfigureAwait(false); + + /// + /// Raise the ActionFailedAsync event. + /// + /// The action being added. + /// The result of indexing. + /// An exception that was thrown. + /// Cancellation token. + /// + /// A task that will not complete until every handler does. + /// + internal async Task RaiseActionFailedAsync( + IndexDocumentsAction action, + IndexingResult result, + Exception exception, + CancellationToken cancellationToken) => + await ActionFailedAsync + .RaiseAsync(action, result, exception, cancellationToken) + .ConfigureAwait(false); + #endregion + + #region Index Documents + /// + /// Send documents to the publisher. All the public APIs run through + /// this method. + /// + /// The batch to send. + /// Whether to call sync or async. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + /// + /// A Task that will complete after the batch is added. + /// + private async Task AddIndexingActionsInternal( + IndexDocumentsBatch batch, + bool async, + CancellationToken cancellationToken) + { + EnsureNotDisposed(); + if (batch != null) + { + await _publisher.AddDocumentsAsync( + batch.Actions, + async, + cancellationToken) + .ConfigureAwait(false); + } + } + + /// + /// Adds a batch of upload, merge, and/or delete actions to eventually + /// send to the search index. + /// + /// The batch of document index actions. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + public virtual void IndexDocuments( + IndexDocumentsBatch batch, + CancellationToken cancellationToken = default) => + AddIndexingActionsInternal(batch, async: false, cancellationToken).EnsureCompleted(); + + /// + /// Adds a batch of upload, merge, and/or delete actions to eventually + /// send to the search index. + /// + /// The batch of document index actions. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + /// + /// A task that completes when the indexing actions have been added but + /// not yet sent. + /// + public virtual async Task IndexDocumentsAsync( + IndexDocumentsBatch batch, + CancellationToken cancellationToken = default) => + await AddIndexingActionsInternal(batch, async: true, cancellationToken).ConfigureAwait(false); + + /// + /// Adds a batch of upload actions to eventually send to the search + /// index. + /// + /// The documents to upload. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + public virtual void UploadDocuments( + IEnumerable documents, + CancellationToken cancellationToken = default) => + IndexDocuments( + IndexDocumentsBatch.Upload(documents), + cancellationToken); + + /// + /// Adds a batch of upload actions to eventually send to the search + /// index. + /// + /// The documents to upload. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + /// + /// A task that completes when the indexing actions have been added but + /// not yet sent. + /// + public virtual async Task UploadDocumentsAsync( + IEnumerable documents, + CancellationToken cancellationToken = default) => + await IndexDocumentsAsync( + IndexDocumentsBatch.Upload(documents), + cancellationToken) + .ConfigureAwait(false); + + /// + /// Adds a batch of merge actions to eventually send to the search + /// index. + /// + /// The documents to merge. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + public virtual void MergeDocuments( + IEnumerable documents, + CancellationToken cancellationToken = default) => + IndexDocuments( + IndexDocumentsBatch.Merge(documents), + cancellationToken); + + /// + /// Adds a batch of merge actions to eventually send to the search + /// index. + /// + /// The documents to merge. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// . + /// + /// A task that completes when the indexing actions have been added but + /// not yet sent. + /// + public virtual async Task MergeDocumentsAsync( + IEnumerable documents, + CancellationToken cancellationToken = default) => + await IndexDocumentsAsync( + IndexDocumentsBatch.Merge(documents), + cancellationToken) + .ConfigureAwait(false); + + /// + /// Adds a batch of merge or upload actions to eventually send to the + /// search index. + /// + /// The documents to merge or upload. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + public virtual void MergeOrUploadDocuments( + IEnumerable documents, + CancellationToken cancellationToken = default) => + IndexDocuments( + IndexDocumentsBatch.MergeOrUpload(documents), + cancellationToken); + + /// + /// Adds a batch of merge or upload actions to eventually send to the + /// search index. + /// + /// The documents to merge or upload. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + /// + /// A task that completes when the indexing actions have been added but + /// not yet sent. + /// + public virtual async Task MergeOrUploadDocumentsAsync( + IEnumerable documents, + CancellationToken cancellationToken = default) => + await IndexDocumentsAsync( + IndexDocumentsBatch.MergeOrUpload(documents), + cancellationToken) + .ConfigureAwait(false); + + /// + /// Adds a batch of delete actions to eventually send to the search + /// index. + /// + /// The documents to delete. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + public virtual void DeleteDocuments( + IEnumerable documents, + CancellationToken cancellationToken = default) => + IndexDocuments( + IndexDocumentsBatch.Delete(documents), + cancellationToken); + + /// + /// Adds a batch of delete actions to eventually send to the search + /// index. + /// + /// The documents to delete. + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + /// + /// A task that completes when the indexing actions have been added but + /// not yet sent. + /// + public virtual async Task DeleteDocumentsAsync( + IEnumerable documents, + CancellationToken cancellationToken = default) => + await IndexDocumentsAsync( + IndexDocumentsBatch.Delete(documents), + cancellationToken) + .ConfigureAwait(false); + #endregion Index Documents + + #region Flush + /// + /// Flush any pending indexing actions. This will wait until + /// everything has been sent before returning. + /// + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + public void Flush(CancellationToken cancellationToken = default) => + _publisher.FlushAsync(async: false, cancellationToken).EnsureCompleted(); + + /// + /// Flush any pending indexing actions. This will wait until + /// everything has been sent before returning. + /// + /// + /// Optional to propagate notifications + /// that the operation should be canceled. + /// + /// A Task that will complete after flushing. + public async Task FlushAsync(CancellationToken cancellationToken = default) => + await _publisher.FlushAsync(async: true, cancellationToken).ConfigureAwait(false); + #endregion Flush + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingPublisher{T}.cs b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingPublisher{T}.cs new file mode 100644 index 000000000000..c55ca968a169 --- /dev/null +++ b/sdk/search/Azure.Search.Documents/src/Batching/SearchIndexingPublisher{T}.cs @@ -0,0 +1,240 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Search.Documents.Models; + +namespace Azure.Search.Documents.Batching +{ + /// + /// The SearchIndexingPublisher is responsible for submitting documents to + /// the service for indexing. + /// + /// + /// The .NET type that maps to the index schema. Instances of this + /// type can be retrieved as documents from the index. You can use + /// for dynamic documents. + /// + internal class SearchIndexingPublisher : Publisher> + { + /// + /// The sender, which we mostly use for raising events. + /// + private SearchIndexingBufferedSender _sender; + + /// + /// Creates a new SearchIndexingPublisher which immediately starts + /// listening to process requests. + /// + /// The sender that produces actions. + /// + /// A value indicating whether the publisher should automatically flush. + /// + /// + /// An optional amount of time to wait before automatically flushing. + /// + /// + /// The number of actions to group into a batch. + /// + /// + /// The number of bytes to use when tuning the behavior of the + /// publisher. + /// + /// + /// The number of times to retry a failed document. + /// + /// + /// A to use when publishing. + /// + public SearchIndexingPublisher( + SearchIndexingBufferedSender sender, + bool autoFlush, + TimeSpan? autoFlushInterval, + int? batchActionSize, + int? batchPayloadSize, + int? retryCount, + CancellationToken publisherCancellationToken) + : base(autoFlush, autoFlushInterval, batchActionSize, batchPayloadSize, retryCount, publisherCancellationToken) + { + _sender = sender; + } + + /// + /// Get a key used to identify a given document. + /// + /// The document. + /// A key for that document. + protected override string GetDocumentKey(IndexDocumentsAction document) => + _sender.KeyFieldAccessor(document.Document); + + /// + /// Process any documents that have been added for publishing. + /// + /// The documents to publish. + /// Cancellation token. + /// Task for processing the added documents. + protected override async Task OnDocumentsAddedAsync(IEnumerable> documents, CancellationToken cancellationToken) + { + // Raise notifications + foreach (IndexDocumentsAction document in documents) + { + await _sender.RaiseActionAddedAsync(document, cancellationToken).ConfigureAwait(false); + } + + // Add all of the documents and possibly auto flush + await base.OnDocumentsAddedAsync(documents, cancellationToken).ConfigureAwait(false); + } + + /// + /// Send indexing actions to be processed by the service. + /// + /// The batch of actions to submit. + /// A cancellation token. + /// Whether the submission was throttled. + protected override async Task OnSubmitBatchAsync(IList>> batch, CancellationToken cancellationToken) + { + // Bail early if someone sent an empty batch + if (batch.Count == 0) { return false; } + + // Notify the action is being sent + foreach (PublisherAction> action in batch) + { + await _sender.RaiseActionSentAsync(action.Document, cancellationToken).ConfigureAwait(false); + } + + // Send the request to the service + Response response = null; + try + { + response = await _sender.SearchClient.IndexDocumentsAsync( + IndexDocumentsBatch.Create(batch.Select(a => a.Document).ToArray()), + cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + // Handle batch level failures + catch (RequestFailedException ex) when (ex.Status == 413) // Payload Too Large + { + // Split the batch and try with smaller payloads + int half = (int)Math.Floor((double)batch.Count / 2.0); + var smaller = new List>>(batch.Take(half)); + foreach (PublisherAction> action in batch.Skip(half)) + { + // Add the second half to the retry queue without + // counting this as a retry attempt + _ = EnqueueRetry(action, skipIncrement: true); + } + + // Try resubmitting with just the smaller half + await SubmitBatchAsync(smaller, cancellationToken).ConfigureAwait(false); + return false; + } + catch (Exception ex) + { + // Retry the whole batch using the same exception for everything + foreach (PublisherAction> action in batch) + { + await EnqueueOrFailRetryAsync(action, null, ex, cancellationToken).ConfigureAwait(false); + } + + // Search currently uses 503s for throttling + return (ex is RequestFailedException failure && failure.Status == 503); + } + + // Handle individual responses which might be success or failure + bool throttled = false; + foreach ((PublisherAction> action, IndexingResult result) in + AssociateResults(batch, response.Value.Results)) + { + // Search currently uses 503s for throttling + throttled |= (result.Status == 503); + + Debug.Assert(action.Key == result.Key); + if (result.Succeeded) + { + await _sender.RaiseActionCompletedAsync( + action.Document, + result, + cancellationToken) + .ConfigureAwait(false); + } + else if (IsRetriable(result.Status)) + { + await EnqueueOrFailRetryAsync( + action, + result, + exception: null, + cancellationToken) + .ConfigureAwait(false); + } + else + { + await _sender.RaiseActionFailedAsync( + action.Document, + result, + exception: null, + cancellationToken) + .ConfigureAwait(false); + } + } + return throttled; + } + + /// + /// Attempt to add an item to the retry queue or raise a failure + /// notification if it's been retried too many times. + /// + /// The action to retry. + /// Result of executing the action. + /// An exception raised by the action. + /// A cancellation token. + /// Task. + private async Task EnqueueOrFailRetryAsync( + PublisherAction> action, + IndexingResult result, + Exception exception, + CancellationToken cancellationToken) + { + if (!EnqueueRetry(action)) + { + await _sender.RaiseActionFailedAsync( + action.Document, + result, + exception, + cancellationToken) + .ConfigureAwait(false); + } + } + + /// + /// Associate the results of a submission with the pending actions. + /// + /// The batch of actions. + /// The results. + /// Actions paired with their result. + private static IEnumerable<(PublisherAction>, IndexingResult)> AssociateResults( + IList>> actions, + IReadOnlyList results) + { + // In the worst case scenario with multiple actions on the + // same key, we'll treat the results as ordered. We'll do a stable + // sort on both collections and then pair them up. + return actions.OrderBy(a => a.Key) + .Zip(results.OrderBy(r => r.Key), (a, r) => (a, r)); + } + + /// + /// Check if a status code for an individual failure is retriable. + /// + /// The status code. + /// Whether we should retry. + private static bool IsRetriable(int status) => + status == 422 || + status == 409 || + status == 503; + } +} diff --git a/sdk/search/Azure.Search.Documents/src/Indexes/FieldBuilder.cs b/sdk/search/Azure.Search.Documents/src/Indexes/FieldBuilder.cs index 970831ad5963..19c185d83b5e 100644 --- a/sdk/search/Azure.Search.Documents/src/Indexes/FieldBuilder.cs +++ b/sdk/search/Azure.Search.Documents/src/Indexes/FieldBuilder.cs @@ -55,7 +55,7 @@ public FieldBuilder() /// /// Gets or sets the to use to generate field names that match JSON property names. - /// You should use hte same value as . + /// You should use the same value as . /// will be used if no value is provided. /// public ObjectSerializer Serializer { get; set; } @@ -69,7 +69,19 @@ public FieldBuilder() /// /// A collection of fields. /// . - public IList Build(Type modelType) + public IList Build(Type modelType) => + BuildMapping(modelType).Values.ToList(); + + /// + /// Creates a dictionary mapping property names to the objects corresponding to the properties of the type supplied. + /// + /// + /// The type for which fields will be created, based on its properties. + /// + /// A collection of fields. + /// . + /// This overload is used to find the Key field of a SearchIndex so we can associate indexing failures with actions. + internal IDictionary BuildMapping(Type modelType) { Argument.AssertNotNull(modelType, nameof(modelType)); @@ -99,7 +111,7 @@ ArgumentException FailOnNonObjectDataType() throw FailOnNonObjectDataType(); } - private static IList Build( + private static IDictionary Build( Type modelType, ObjectInfo info, IMemberNameConverter nameProvider, @@ -129,7 +141,7 @@ SearchField CreateComplexField(SearchFieldDataType dataType, Type underlyingClrT try { IList subFields = - Build(underlyingClrType, info, nameProvider, processedTypes); + Build(underlyingClrType, info, nameProvider, processedTypes).Values.ToList(); if (prop.SerializedName is null) { @@ -214,7 +226,10 @@ ArgumentException FailOnUnknownDataType() onComplexDataType: CreateComplexField); } - return info.Properties.Select(BuildField).Where(field => field != null).ToList(); + return info.Properties + .Select(prop => (prop.Name, BuildField(prop))) + .Where(pair => pair.Item2 != null) + .ToDictionary(pair => pair.Item1, pair => pair.Item2); } private static IDataTypeInfo GetDataTypeInfo(Type propertyType, IMemberNameConverter nameProvider) diff --git a/sdk/search/Azure.Search.Documents/src/Indexes/SearchIndexClient.cs b/sdk/search/Azure.Search.Documents/src/Indexes/SearchIndexClient.cs index fcb24696000e..3a0c37027ff6 100644 --- a/sdk/search/Azure.Search.Documents/src/Indexes/SearchIndexClient.cs +++ b/sdk/search/Azure.Search.Documents/src/Indexes/SearchIndexClient.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -78,6 +79,36 @@ public SearchIndexClient( _version = options.Version; } + /// + /// Initializes a new instance of the class. + /// + /// Required. The URI endpoint of the Search service. This is likely to be similar to "https://{search_service}.search.windows.net". The URI must use HTTPS. + /// An optional customized serializer to use for search documents. + /// The authenticated used for sending requests to the Search Service. + /// The used to provide tracing support for the client library. + /// The REST API version of the Search Service to use when making requests. + internal SearchIndexClient( + Uri endpoint, + ObjectSerializer serializer, + HttpPipeline pipeline, + ClientDiagnostics diagnostics, + SearchClientOptions.ServiceVersion version) + { + Debug.Assert(endpoint != null); + Debug.Assert(string.Equals(endpoint.Scheme, Uri.UriSchemeHttps, StringComparison.OrdinalIgnoreCase)); + Debug.Assert(pipeline != null); + Debug.Assert(diagnostics != null); + Debug.Assert( + SearchClientOptions.ServiceVersion.V2020_06_30 <= version && + version <= SearchClientOptions.LatestVersion); + + Endpoint = endpoint; + _serializer = serializer; + _clientDiagnostics = diagnostics; + _pipeline = pipeline; + _version = version; + } + /// /// Gets the URI endpoint of the Search service. This is likely /// to be similar to "https://{search_service}.search.windows.net". diff --git a/sdk/search/Azure.Search.Documents/src/SearchClient.cs b/sdk/search/Azure.Search.Documents/src/SearchClient.cs index db37388ec457..853037ea2d45 100644 --- a/sdk/search/Azure.Search.Documents/src/SearchClient.cs +++ b/sdk/search/Azure.Search.Documents/src/SearchClient.cs @@ -52,7 +52,7 @@ public class SearchClient /// Gets an that can be used to /// customize the serialization of strongly typed models. /// - private ObjectSerializer Serializer { get; } + internal ObjectSerializer Serializer { get; } /// /// Gets the authenticated used for sending @@ -180,7 +180,6 @@ public SearchClient( Version.ToVersionString()); } -#pragma warning disable CS1573 // Not all parameters will be used depending on feature flags /// /// Initializes a new instance of the SearchClient class from a /// . @@ -193,6 +192,9 @@ public SearchClient( /// /// Required. The name of the Search Index. /// + /// + /// An optional customized serializer to use for search documents. + /// /// /// The authenticated used for sending /// requests to the Search Service. @@ -212,7 +214,6 @@ internal SearchClient( HttpPipeline pipeline, ClientDiagnostics diagnostics, SearchClientOptions.ServiceVersion version) - #pragma warning restore CS1573 { Debug.Assert(endpoint != null); Debug.Assert(string.Equals(endpoint.Scheme, Uri.UriSchemeHttps, StringComparison.OrdinalIgnoreCase)); @@ -238,6 +239,18 @@ internal SearchClient( null, Version.ToVersionString()); } + + /// + /// Get a SearchIndexClient with the same pipeline. + /// + /// A SearchIndexClient. + internal SearchIndexClient GetSearchIndexClient() => + new SearchIndexClient( + Endpoint, + Serializer, + Pipeline, + ClientDiagnostics, + Version); #endregion ctors #region GetDocumentCount @@ -1886,5 +1899,26 @@ public virtual async Task> DeleteDocumentsAsync( } } #endregion Index Documents Conveniences + + /// + /// Creates a new that + /// can be used to index search documents with intelligent batching, + /// automatic flushing, and retries for failed indexing actions. + /// + /// + /// The .NET type that maps to the index schema. Instances of this + /// type can be retrieved as documents from the index. You can use + /// for dynamic documents. + /// + /// + /// The to + /// customize the sender's behavior. + /// + /// + /// A new . + /// + public virtual SearchIndexingBufferedSender CreateIndexingBufferedSender( + SearchIndexingBufferedSenderOptions options = null) => + new SearchIndexingBufferedSender(this, options); } }