Skip to content

Commit

Permalink
feat(csharp): add chunked batch (#2795)
Browse files Browse the repository at this point in the history
  • Loading branch information
morganleroi authored Feb 27, 2024
1 parent 1348364 commit 18e8248
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/.cache_version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.18
1.0.19
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,15 @@ private static async Task<T> RetryUntil<T>(Func<Task<T>> func, Func<T, bool> val
/// </summary>
/// <param name="client"></param>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="data">The list of records to replace.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
public static List<long> ReplaceAllObjects<T>(this SearchClient client, string indexName,
IEnumerable<T> data, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class
=> AsyncHelper.RunSync(() => client.ReplaceAllObjectsAsync(indexName, data, options, cancellationToken));
public static ReplaceAllObjectsResponse ReplaceAllObjects<T>(this SearchClient client, string indexName,
IEnumerable<T> objects, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
=> AsyncHelper.RunSync(() =>
client.ReplaceAllObjectsAsync(indexName, objects, batchSize, options, cancellationToken));

/// <summary>
/// Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
Expand All @@ -439,15 +442,17 @@ public static List<long> ReplaceAllObjects<T>(this SearchClient client, string i
/// </summary>
/// <param name="client"></param>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="data">The list of records to replace.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
public static async Task<List<long>> ReplaceAllObjectsAsync<T>(this SearchClient client, string indexName,
IEnumerable<T> data, RequestOptions options = null, CancellationToken cancellationToken = default) where T : class
public static async Task<ReplaceAllObjectsResponse> ReplaceAllObjectsAsync<T>(this SearchClient client,
string indexName, IEnumerable<T> objects, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
if (data == null)
if (objects == null)
{
throw new ArgumentNullException(nameof(data));
throw new ArgumentNullException(nameof(objects));
}

var rnd = new Random();
Expand All @@ -456,26 +461,84 @@ public static async Task<List<long>> ReplaceAllObjectsAsync<T>(this SearchClient
// Copy settings, synonyms and query rules into the temporary index
var copyResponse = await client.OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);

await client.WaitForTaskAsync(indexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken).ConfigureAwait(false);
await client.WaitForTaskAsync(indexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

// Add objects to the temporary index
var batchResponse = await client.BatchAsync(tmpIndexName,
new BatchWriteParams(data.Select(x => new BatchRequest(Action.AddObject, x)).ToList()),
var batchResponse = await client.ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, batchSize,
options, cancellationToken).ConfigureAwait(false);

await client.WaitForTaskAsync(tmpIndexName, batchResponse.TaskID, requestOptions: options, ct: cancellationToken).ConfigureAwait(false);

// Move the temporary index to the main one
var moveResponse = await client.OperationIndexAsync(tmpIndexName,
new OperationIndexParams(OperationType.Move, indexName), options, cancellationToken)
.ConfigureAwait(false);

await client.WaitForTaskAsync(tmpIndexName, moveResponse.TaskID, requestOptions: options, ct: cancellationToken).ConfigureAwait(false);
await client.WaitForTaskAsync(tmpIndexName, moveResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

return new ReplaceAllObjectsResponse
{
CopyOperationResponse = copyResponse,
MoveOperationResponse = moveResponse,
BatchResponses = batchResponse
};
}

/// <summary>
/// Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests. (Synchronous version)
/// </summary>
/// <param name="client"></param>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="action">The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`.</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
public static List<BatchResponse> ChunkedBatch<T>(this SearchClient client, string indexName,
IEnumerable<T> objects, Action action = Action.AddObject, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class =>
AsyncHelper.RunSync(() =>
client.ChunkedBatchAsync(indexName, objects, action, batchSize, options, cancellationToken));

/// <summary>
/// Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
/// </summary>
/// <param name="client"></param>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="action">The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`.</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation Token to cancel the request.</param>
/// <typeparam name="T"></typeparam>
public static async Task<List<BatchResponse>> ChunkedBatchAsync<T>(this SearchClient client, string indexName,
IEnumerable<T> objects, Action action = Action.AddObject, int batchSize = 1000, RequestOptions options = null,
CancellationToken cancellationToken = default) where T : class
{
var batchCount = (int)Math.Ceiling((double)objects.Count() / batchSize);
var responses = new List<BatchResponse>();

return [copyResponse.TaskID, batchResponse.TaskID, moveResponse.TaskID];
for (var i = 0; i < batchCount; i++)
{
var chunk = objects.Skip(i * batchSize).Take(batchSize);
var batchResponse = await client.BatchAsync(indexName,
new BatchWriteParams(chunk.Select(x => new BatchRequest(action, x)).ToList()),
options, cancellationToken).ConfigureAwait(false);

responses.Add(batchResponse);
}

foreach (var batch in responses)
{
await client.WaitForTaskAsync(indexName, batch.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
}

return responses;
}

private static int NextDelay(int retryCount)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System.Collections.Generic;
using Algolia.Search.Models.Search;

namespace Algolia.Search.Utils;

/// <summary>
/// All responses from the ReplaceAllObjects calls.
/// </summary>
public class ReplaceAllObjectsResponse
{
/// <summary>
/// The response of the copy operation.
/// </summary>
public UpdatedAtResponse CopyOperationResponse { get; set; }

/// <summary>
/// The response of the batch operations.
/// </summary>
public List<BatchResponse> BatchResponses { get; set; }

/// <summary>
/// The response of the move operation.
/// </summary>
public UpdatedAtResponse MoveOperationResponse { get; set; }
}
25 changes: 21 additions & 4 deletions tests/output/csharp/src/ClientExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public async Task ShouldReplaceAllObjects()
HttpStatusCode = 200,
Body = new MemoryStream(
Encoding.UTF8.GetBytes(
serializer.Serialize(new UpdatedAtResponse(3, "2021-01-01T00:00:00Z"))
serializer.Serialize(new UpdatedAtResponse(4, "2021-01-01T00:00:00Z"))
)
)
}
Expand All @@ -635,6 +635,10 @@ public async Task ShouldReplaceAllObjects()
r.Uri.AbsolutePath,
"1\\/indexes\\/my-test-index_tmp_[0-9]+\\/task\\/3"
)
|| Regex.IsMatch(
r.Uri.AbsolutePath,
"1\\/indexes\\/my-test-index_tmp_[0-9]+\\/task\\/4"
)
),
It.IsAny<TimeSpan>(),
It.IsAny<TimeSpan>(),
Expand All @@ -657,7 +661,7 @@ public async Task ShouldReplaceAllObjects()
);

httpMock
.Setup(c =>
.SetupSequence(c =>
c.SendRequestAsync(
It.Is<Request>(r =>
Regex.IsMatch(r.Uri.AbsolutePath, "1\\/indexes\\/my-test-index_tmp_[0-9]+\\/batch")
Expand All @@ -677,12 +681,25 @@ public async Task ShouldReplaceAllObjects()
)
}
)
).Returns(
Task.FromResult(
new AlgoliaHttpResponse
{
HttpStatusCode = 200,
Body = new MemoryStream(
Encoding.UTF8.GetBytes(serializer.Serialize(new BatchResponse(3, [])))
)
}
)
);

var results = await client.ReplaceAllObjectsAsync("my-test-index", new List<object> { });
var results =
await client.ReplaceAllObjectsAsync("my-test-index", new List<object> { new(), new(), new() }, batchSize: 2);

httpMock.VerifyAll();

Assert.Equivalent(results, new List<long> { 1, 2, 3 });
Assert.Equal(1, results.CopyOperationResponse.TaskID);
Assert.Equal([2,3], results.BatchResponses.Select(r => r.TaskID));
Assert.Equal(4, results.MoveOperationResponse.TaskID);
}
}

0 comments on commit 18e8248

Please sign in to comment.