Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(clients): safer replaceAllObjects + metis compliant #3164

Merged
merged 11 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ private static async Task<T> RetryUntil<T>(Func<Task<T>> func, Func<T, bool> val
/// Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
/// Replace all objects in an index without any downtime. Internally, this method copies the existing index settings, synonyms and query rules and indexes all passed objects.
/// Finally, the temporary one replaces the existing index. (Synchronous version)
/// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
Expand All @@ -424,6 +425,7 @@ public ReplaceAllObjectsResponse ReplaceAllObjects<T>(string indexName, IEnumera
/// Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
/// Replace all objects in an index without any downtime. Internally, this method copies the existing index settings, synonyms and query rules and indexes all passed objects.
/// Finally, the temporary one replaces the existing index.
/// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
/// </summary>
/// <param name="indexName">The index in which to perform the request.</param>
/// <param name="objects">The list of `objects` to store in the given Algolia `indexName`.</param>
Expand All @@ -441,20 +443,24 @@ public async Task<ReplaceAllObjectsResponse> ReplaceAllObjectsAsync<T>(string in
var rnd = new Random();
var tmpIndexName = $"{indexName}_tmp_{rnd.Next(100)}";

// Copy settings, synonyms and query rules into the temporary index
var copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);

await WaitForTaskAsync(indexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

// Add objects to the temporary index
var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, batchSize,
options, cancellationToken).ConfigureAwait(false);

// Move the temporary index to the main one
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Rules, ScopeType.Settings, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

var moveResponse = await OperationIndexAsync(tmpIndexName,
new OperationIndexParams(OperationType.Move, indexName), options, cancellationToken)
.ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion clients/algoliasearch-client-go/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ linters:
- wsl
- varnamelen
- nlreturn
- goerr113
- err113
- gochecknoglobals
- exhaustruct
- exhaustive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ public suspend fun SearchClient.searchForFacets(
* Internally, this method copies the existing index settings, synonyms and query rules and indexes all
* passed objects. Finally, the temporary one replaces the existing index.
*
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
*
* @param serializer [KSerializer] of type [T] for serialization.
* @param records The list of records to replace.
* @return intermediate operations (index name to task ID).
Expand All @@ -298,37 +300,44 @@ public suspend fun <T> SearchClient.replaceAllObjects(
val body = options.json.encodeToJsonElement(serializer, record).jsonObject
BatchRequest(action = Action.AddObject, body = body)
}
val destinationIndex = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"
val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"

// 1. Copy index resources
val copy = operationIndex(
var copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = destinationIndex,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitTask(indexName = indexName, taskID = copy.taskID)

// 2. Save new objects
val batch = batch(
indexName = destinationIndex,
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions,
)
waitTask(indexName = destinationIndex, taskID = batch.taskID)
waitTask(indexName = tmpIndexName, taskID = batch.taskID)
waitTask(indexName = tmpIndexName, taskID = copy.taskID)

copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitTask(indexName = tmpIndexName, taskID = copy.taskID)

// 3. Move temporary index to source index
val move = operationIndex(
indexName = destinationIndex,
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions,
)
waitTask(indexName = destinationIndex, taskID = move.taskID)
waitTask(indexName = tmpIndexName, taskID = move.taskID)

// 4. Return the list of operations
return listOf(copy.taskID, batch.taskID, move.taskID)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ package object extension {
* settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the
* existing index.
*
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation
* details.
*
* @param indexName
* The index in which to perform the request.
* @param records
Expand All @@ -215,31 +218,44 @@ package object extension {
val requests = records.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
val destinationIndex = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"

for {
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = destinationIndex,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = destinationIndex, taskID = copy.taskID, requestOptions = requestOptions)

batch <- client.batch(
indexName = destinationIndex,
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions
) // 3. update the copy
_ <- client.waitTask(indexName = destinationIndex, taskID = batch.taskID, requestOptions = requestOptions)
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = batch.taskID, requestOptions = requestOptions)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

replace <- client.operationIndex(
indexName = destinationIndex,
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = indexName, taskID = replace.taskID, requestOptions = requestOptions)
_ <- client.waitTask(indexName = tmpIndexName, taskID = replace.taskID, requestOptions = requestOptions)
} yield Seq(copy.taskID, batch.taskID, replace.taskID)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ public extension SearchClient {
}

/// Replace all objects in an index
///
/// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
/// - parameter objects: The new objects
/// - parameter indexName: The name of the index where to replace the objects
/// - parameter requestOptions: The request options
Expand All @@ -470,8 +472,7 @@ public extension SearchClient {
) async throws -> ReplaceAllObjectsResponse {
let tmpIndexName = try "\(indexName)_tmp_\(randomString())"

// Copy all index resources from production index
let copyOperationResponse = try await operationIndex(
var copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
Expand All @@ -481,18 +482,26 @@ public extension SearchClient {
requestOptions: requestOptions
)

try await self.waitForTask(with: copyOperationResponse.taskID, in: indexName)

// Send records to the tmp index (batched)
let batchResponses = try await self.chunkedBatch(
indexName: tmpIndexName,
objects: objects,
waitForTasks: true,
batchSize: batchSize,
requestOptions: requestOptions
)
try await self.waitForTask(with: copyOperationResponse.taskID, in: tmpIndexName)

copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.rules, .settings, .synonyms]
),
requestOptions: requestOptions
)
try await self.waitForTask(with: copyOperationResponse.taskID, in: tmpIndexName)

// Move the temporary index to replace the main one
let moveOperationResponse = try await self.operationIndex(
indexName: tmpIndexName,
operationIndexParams: OperationIndexParams(
Expand All @@ -501,7 +510,6 @@ public extension SearchClient {
),
requestOptions: requestOptions
)

try await self.waitForTask(with: moveOperationResponse.taskID, in: tmpIndexName)

return ReplaceAllObjectsResponse(
Expand Down
2 changes: 1 addition & 1 deletion playground/python/app/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def main():

try:
resp = await client.replace_all_objects(
index_name="test_replace_all_objects",
index_name="newoneeverytime",
objects=[{"name": f"John Doe{i}", "objectID": f"fff2bd4d-bb17-4e21-a0c4-0a8ea5e363f2{i}" } for i in range(33)],
batch_size=10
)
Expand Down
25 changes: 18 additions & 7 deletions templates/go/search_helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -516,32 +516,43 @@ func (c *APIClient) ChunkedBatch(indexName string, objects []map[string]any, act
}

// ReplaceAllObjects replaces all objects (records) in the given `indexName` with the given `objects`. A temporary index is created during this process in order to backup your data.
// See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
func (c *APIClient) ReplaceAllObjects(indexName string, objects []map[string]any, batchSize *int) (*ReplaceAllObjectsResponse, error) {
tmpIndex := fmt.Sprintf("%s_tmp_%d", indexName, time.Now().UnixNano())
tmpIndexName := fmt.Sprintf("%s_tmp_%d", indexName, time.Now().UnixNano())

copyResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATIONTYPE_COPY, tmpIndex, WithOperationIndexParamsScope([]ScopeType{SCOPETYPE_RULES, SCOPETYPE_SETTINGS, SCOPETYPE_SYNONYMS}))))
copyResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATIONTYPE_COPY, tmpIndexName, WithOperationIndexParamsScope([]ScopeType{SCOPETYPE_RULES, SCOPETYPE_SETTINGS, SCOPETYPE_SYNONYMS}))))
if err != nil {
return nil, err
}

_, err = c.WaitForTask(indexName, copyResp.TaskID, nil, nil, nil)
waitForTask := true

batchResp, err := c.ChunkedBatch(tmpIndexName, objects, nil, &waitForTask, batchSize)
if err != nil {
return nil, err
}

waitForTask := true
_, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, nil, nil, nil)
if err != nil {
return nil, err
}

copyResp, err = c.OperationIndex(c.NewApiOperationIndexRequest(indexName, NewOperationIndexParams(OPERATIONTYPE_COPY, tmpIndexName, WithOperationIndexParamsScope([]ScopeType{SCOPETYPE_RULES, SCOPETYPE_SETTINGS, SCOPETYPE_SYNONYMS}))))
if err != nil {
return nil, err
}

batchResp, err := c.ChunkedBatch(tmpIndex, objects, nil, &waitForTask, batchSize)
_, err = c.WaitForTask(tmpIndexName, copyResp.TaskID, nil, nil, nil)
if err != nil {
return nil, err
}

moveResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(tmpIndex, NewOperationIndexParams(OPERATIONTYPE_MOVE, indexName)))
moveResp, err := c.OperationIndex(c.NewApiOperationIndexRequest(tmpIndexName, NewOperationIndexParams(OPERATIONTYPE_MOVE, indexName)))
if err != nil {
return nil, err
}

_, err = c.WaitForTask(indexName, moveResp.TaskID, nil, nil, nil)
_, err = c.WaitForTask(tmpIndexName, moveResp.TaskID, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
18 changes: 16 additions & 2 deletions templates/java/api_helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ return responses;
/**
* Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are
* untouched. Replace all records in an index without any downtime.
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
*
* @param indexName The `indexName` to replace `objects` in.
* @param objects The array of `objects` to store in the given Algolia `indexName`.
Expand Down Expand Up @@ -608,18 +609,31 @@ UpdatedAtResponse copyOperationResponse = operationIndex(
.addScope(ScopeType.SYNONYMS),
requestOptions
);
waitForTask(indexName, copyOperationResponse.getTaskID(), requestOptions);

// Save new objects
List<BatchResponse> batchResponses = chunkedBatch(tmpIndexName, objects, Action.ADD_OBJECT, true, batchSize, requestOptions);

waitForTask(tmpIndexName, copyOperationResponse.getTaskID(), requestOptions);

copyOperationResponse = operationIndex(
indexName,
new OperationIndexParams()
.setOperation(OperationType.COPY)
.setDestination(tmpIndexName)
.addScope(ScopeType.SETTINGS)
.addScope(ScopeType.RULES)
.addScope(ScopeType.SYNONYMS),
requestOptions
);
waitForTask(tmpIndexName, copyOperationResponse.getTaskID(), requestOptions);

// Move temporary index to source index
UpdatedAtResponse moveOperationResponse = operationIndex(
tmpIndexName,
new OperationIndexParams().setOperation(OperationType.MOVE).setDestination(indexName),
requestOptions
);
waitForTask(indexName, moveOperationResponse.getTaskID(), requestOptions);
waitForTask(tmpIndexName, moveOperationResponse.getTaskID(), requestOptions);

return new ReplaceAllObjectsResponse()
.setCopyOperationResponse(copyOperationResponse)
Expand Down
Loading
Loading