From fc1752b6e37d9db8918887a16457392ee82311ad Mon Sep 17 00:00:00 2001 From: Thomas Raffray Date: Tue, 18 Jun 2024 17:34:32 +0200 Subject: [PATCH 1/5] feat(clients): add chunkedBatch helper for Kotlin and Scala (#3206) --- .../algolia/client/extensions/SearchClient.kt | 72 +++++++++++++---- .../algoliasearch/extension/package.scala | 81 ++++++++++++++++--- 2 files changed, 129 insertions(+), 24 deletions(-) diff --git a/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt b/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt index 2ffa489f26..ac71e48077 100644 --- a/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt +++ b/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt @@ -276,6 +276,49 @@ public suspend fun SearchClient.searchForFacets( ).results.map { it as SearchForFacetValuesResponse } } +/** + * Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests. + * + * @param indexName The index in which to perform the request. + * @param records The list of objects to index. + * @param serializer The serializer to use for the objects. + * @param action The action to perform on the objects. Default is `Action.AddObject`. + * @param waitForTask If true, wait for the task to complete. + * @param batchSize The size of the batch. Default is 1000. + * @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions. + * @return The list of responses from the batch requests. + * + */ +public suspend fun SearchClient.chunkedBatch( + indexName: String, + records: List, + serializer: KSerializer, + action: Action = Action.AddObject, + waitForTask: Boolean, + batchSize: Int = 1000, + requestOptions: RequestOptions? = null, +): List { + val tasks = mutableListOf() + records.chunked(batchSize).forEach { chunk -> + val requests = chunk.map { + BatchRequest( + action = action, + body = options.json.encodeToJsonElement(serializer, it).jsonObject + ) + } + val batch = batch( + indexName = indexName, + batchWriteParams = BatchWriteParams(requests), + requestOptions = requestOptions, + ) + tasks.add(batch) + } + if (waitForTask) { + tasks.forEach { waitTask(indexName, it.taskID) } + } + return tasks +} + /** * Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched. * Replace all objects in an index without any downtime. @@ -284,22 +327,19 @@ public suspend fun SearchClient.searchForFacets( * * See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details. * - * @param serializer [KSerializer] of type [T] for serialization. + * @param indexName The index in which to perform the request. * @param records The list of records to replace. - * @return intermediate operations (index name to task ID). + * @param serializer [KSerializer] of type [T] for serialization. + * @param batchSize The size of the batch. Default is 1000. + * @return responses from the three-step operations: copy, batch, move. */ public suspend fun SearchClient.replaceAllObjects( indexName: String, - serializer: KSerializer, records: List, + serializer: KSerializer, + batchSize: Int = 1000, requestOptions: RequestOptions?, -): List { - if (records.isEmpty()) return emptyList() - - val requests = records.map { record -> - val body = options.json.encodeToJsonElement(serializer, record).jsonObject - BatchRequest(action = Action.AddObject, body = body) - } +): ReplaceAllObjectsResponse { val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}" var copy = operationIndex( @@ -312,12 +352,16 @@ public suspend fun SearchClient.replaceAllObjects( requestOptions = requestOptions, ) - val batch = batch( + val batchResponses = this.chunkedBatch( indexName = tmpIndexName, - batchWriteParams = BatchWriteParams(requests), + records = records, + serializer = serializer, + action = Action.AddObject, + waitForTask = true, + batchSize = batchSize, requestOptions = requestOptions, ) - waitTask(indexName = tmpIndexName, taskID = batch.taskID) + waitTask(indexName = tmpIndexName, taskID = copy.taskID) copy = operationIndex( @@ -338,7 +382,7 @@ public suspend fun SearchClient.replaceAllObjects( ) waitTask(indexName = tmpIndexName, taskID = move.taskID) - return listOf(copy.taskID, batch.taskID, move.taskID) + return ReplaceAllObjectsResponse(copy, batchResponses, move) } /** diff --git a/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala b/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala index fb9335d296..71abc85ffb 100644 --- a/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala +++ b/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala @@ -193,6 +193,57 @@ package object extension { Future.successful(true) } + /** Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests. + * + * @param indexName + * The index in which to perform the request. + * @param records + * The list of records to replace. + * @param action + * The action to perform on the records. + * @param waitForTasks + * Whether to wait for the tasks to complete. + * @param batchSize + * The size of the batch. Default is 1000. + * @param requestOptions + * Additional request configuration. + * @return + * A future containing the response of the batch operations. + */ + def chunkedBatch( + indexName: String, + records: Seq[Any], + action: Action = Action.AddObject, + waitForTasks: Boolean, + batchSize: Int = 1000, + requestOptions: Option[RequestOptions] = None + )(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = { + var futures = Seq.empty[Future[BatchResponse]] + records.grouped(batchSize).foreach { chunk => + val requests = chunk.map { record => + BatchRequest(action = action, body = record) + } + val future = client.batch( + indexName = indexName, + batchWriteParams = BatchWriteParams(requests), + requestOptions = requestOptions + ) + futures = futures :+ future + } + + val responses = Future.sequence(futures) + + if (waitForTasks) { + responses.foreach { tasks => + tasks.foreach { task => + client.waitTask(indexName, task.taskID, requestOptions = requestOptions) + } + } + } + + responses + } + /** Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched. * Replace all objects in an index without any downtime. Internally, this method copies the existing index * settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the @@ -205,16 +256,19 @@ package object extension { * The index in which to perform the request. * @param records * The list of records to replace. + * @param batchSize + * The size of the batch. Default is 1000. + * @param requestOptions + * Additional request configuration. * @return - * intermediate operations (task IDs). + * A future containing the response of the three-step operations: copy, batch and move. */ def replaceAllObjects( indexName: String, records: Seq[Any], + batchSize: Int = 1000, requestOptions: Option[RequestOptions] = None - )(implicit ec: ExecutionContext): Future[Seq[Long]] = { - if (records.isEmpty) return Future.successful(Seq.empty) - + )(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = { val requests = records.map { record => BatchRequest(action = Action.AddObject, body = record) } @@ -231,12 +285,15 @@ package object extension { requestOptions = requestOptions ) - batch <- client.batch( + batchResponses <- chunkedBatch( indexName = tmpIndexName, - batchWriteParams = BatchWriteParams(requests), + records = records, + action = Action.AddObject, + waitForTasks = true, + batchSize = batchSize, requestOptions = requestOptions ) - _ <- client.waitTask(indexName = tmpIndexName, taskID = batch.taskID, requestOptions = requestOptions) + _ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions) copy <- client.operationIndex( @@ -250,13 +307,17 @@ package object extension { ) _ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions) - replace <- client.operationIndex( + move <- client.operationIndex( indexName = tmpIndexName, operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName), requestOptions = requestOptions ) - _ <- client.waitTask(indexName = tmpIndexName, taskID = replace.taskID, requestOptions = requestOptions) - } yield Seq(copy.taskID, batch.taskID, replace.taskID) + _ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions) + } yield ReplaceAllObjectsResponse( + copyOperationResponse = copy, + batchResponses = batchResponses, + moveOperationResponse = move + ) } } } From 4058d33b4db2e297a99bb2980e22a10fcdd813fa Mon Sep 17 00:00:00 2001 From: algolia-bot Date: Tue, 18 Jun 2024 15:47:09 +0000 Subject: [PATCH 2/5] chore: generated code for commit fc1752b6e37d9db8918887a16457392ee82311ad. [skip ci] Co-authored-by: Thomas Raffray --- .../kotlin/com/algolia/client/extensions/SearchClient.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt b/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt index ac71e48077..e511f6a1a5 100644 --- a/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt +++ b/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt @@ -303,7 +303,7 @@ public suspend fun SearchClient.chunkedBatch( val requests = chunk.map { BatchRequest( action = action, - body = options.json.encodeToJsonElement(serializer, it).jsonObject + body = options.json.encodeToJsonElement(serializer, it).jsonObject, ) } val batch = batch( From 3b4be7a3de3d4ec482172cf01f6e3220b98ed035 Mon Sep 17 00:00:00 2001 From: Pierre Millot Date: Tue, 18 Jun 2024 20:59:29 +0200 Subject: [PATCH 3/5] feat(ruby): add chunked_batch and replace_all_objects helpers (#3208) --- playground/ruby/Gemfile.lock | 2 +- templates/ruby/search_helpers.mustache | 100 +++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/playground/ruby/Gemfile.lock b/playground/ruby/Gemfile.lock index 8fd6bf86ac..bdece442fd 100644 --- a/playground/ruby/Gemfile.lock +++ b/playground/ruby/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: ../../clients/algoliasearch-client-ruby specs: - algolia (3.0.0.beta.2) + algolia (3.0.0.beta.4) faraday (>= 1.0.1, < 3.0) faraday-net_http_persistent (>= 0.15, < 3) net-http-persistent diff --git a/templates/ruby/search_helpers.mustache b/templates/ruby/search_helpers.mustache index 7a06e4b5d4..c8828e8628 100644 --- a/templates/ruby/search_helpers.mustache +++ b/templates/ruby/search_helpers.mustache @@ -201,4 +201,104 @@ def get_secured_api_key_remaining_validity(secured_api_key) valid_until = matches[1].to_i valid_until - now +end + +# Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests. +# +# @param index_name [String] the `index_name` where the operation will be performed. +# @param objects [Array] The array of `objects` to store in the given Algolia `index_name`. +# @param action [Action] The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`. +# @param wait_for_tasks [Boolean] Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. +# @param batch_size [int] The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. +# @param request_options: The request options to send along with the query, they will be merged with the transporter base parameters (headers, query params, timeouts, etc.). (optional) +# +# @return [Array] +# +def chunked_batch(index_name, objects, action = Action::ADD_OBJECT, wait_for_tasks = false, batch_size = 1000, request_options = {}) + responses = [] + objects.each_slice(batch_size) do |chunk| + requests = chunk.map do |object| + Search::BatchRequest.new(action: action, body: object) + end + + responses.append(batch(index_name, Search::BatchWriteParams.new(requests: requests), request_options)) + end + + if wait_for_tasks + responses.each do |response| + wait_for_task(index_name, response.task_id) + end + end + + responses +end + +# Helper: Replaces all objects (records) in the given `index_name` with the given `objects`. A temporary index is created during this process in order to backup your data. +# +# @param index_name [String] The `index_name` to replace `objects` in. +# @param objects [Array] The array of `objects` to store in the given Algolia `index_name`. +# @param batch_size [int] The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. +# @param request_options: The request options to send along with the query, they will be merged with the transporter base parameters (headers, query params, timeouts, etc.). (optional) +# +# @return [Array] +def replace_all_objects(index_name, objects, batch_size = 1000, request_options = {}) + tmp_index_name = index_name + '_tmp_' + rand(10_000_000).to_s + + copy_operation_response = operation_index( + index_name, + Search::OperationIndexParams.new( + operation: Search::OperationType::COPY, + destination: tmp_index_name, + scope: [ + Search::ScopeType::SETTINGS, + Search::ScopeType::SYNONYMS, + Search::ScopeType::RULES + ] + ), + request_options + ) + + batch_responses = chunked_batch( + tmp_index_name, + objects, + Search::Action::ADD_OBJECT, + true, + batch_size, + request_options + ) + + wait_for_task(tmp_index_name, copy_operation_response.task_id) + + copy_operation_response = operation_index( + index_name, + Search::OperationIndexParams.new( + operation: Search::OperationType::COPY, + destination: tmp_index_name, + scope: [ + Search::ScopeType::SETTINGS, + Search::ScopeType::SYNONYMS, + Search::ScopeType::RULES + ] + ), + request_options + ) + + wait_for_task(tmp_index_name, copy_operation_response.task_id) + + move_operation_response = operation_index( + tmp_index_name, + Search::OperationIndexParams.new( + operation: Search::OperationType::MOVE, + destination: index_name + ), + request_options + ) + + wait_for_task(tmp_index_name, move_operation_response.task_id) + + Search::ReplaceAllObjectsResponse.new( + copy_operation_response: copy_operation_response, + batch_responses: batch_responses, + move_operation_response: move_operation_response + ) end \ No newline at end of file From bcfdf51e0ef8f83dcc1f462d66db9b06d2d2cc41 Mon Sep 17 00:00:00 2001 From: algolia-bot Date: Tue, 18 Jun 2024 19:12:08 +0000 Subject: [PATCH 4/5] chore: generated code for commit 3b4be7a3de3d4ec482172cf01f6e3220b98ed035. [skip ci] Co-authored-by: Pierre Millot --- .../lib/algolia/api/search_client.rb | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/clients/algoliasearch-client-ruby/lib/algolia/api/search_client.rb b/clients/algoliasearch-client-ruby/lib/algolia/api/search_client.rb index a7d12de5bd..56b7b45159 100644 --- a/clients/algoliasearch-client-ruby/lib/algolia/api/search_client.rb +++ b/clients/algoliasearch-client-ruby/lib/algolia/api/search_client.rb @@ -3177,5 +3177,105 @@ def get_secured_api_key_remaining_validity(secured_api_key) valid_until - now end + + # Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests. + # + # @param index_name [String] the `index_name` where the operation will be performed. + # @param objects [Array] The array of `objects` to store in the given Algolia `index_name`. + # @param action [Action] The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`. + # @param wait_for_tasks [Boolean] Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable. + # @param batch_size [int] The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + # @param request_options: The request options to send along with the query, they will be merged with the transporter base parameters (headers, query params, timeouts, etc.). (optional) + # + # @return [Array] + # + def chunked_batch(index_name, objects, action = Action::ADD_OBJECT, wait_for_tasks = false, batch_size = 1000, request_options = {}) + responses = [] + objects.each_slice(batch_size) do |chunk| + requests = chunk.map do |object| + Search::BatchRequest.new(action: action, body: object) + end + + responses.append(batch(index_name, Search::BatchWriteParams.new(requests: requests), request_options)) + end + + if wait_for_tasks + responses.each do |response| + wait_for_task(index_name, response.task_id) + end + end + + responses + end + + # Helper: Replaces all objects (records) in the given `index_name` with the given `objects`. A temporary index is created during this process in order to backup your data. + # + # @param index_name [String] The `index_name` to replace `objects` in. + # @param objects [Array] The array of `objects` to store in the given Algolia `index_name`. + # @param batch_size [int] The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000. + # @param request_options: The request options to send along with the query, they will be merged with the transporter base parameters (headers, query params, timeouts, etc.). (optional) + # + # @return [Array] + def replace_all_objects(index_name, objects, batch_size = 1000, request_options = {}) + tmp_index_name = index_name + '_tmp_' + rand(10_000_000).to_s + + copy_operation_response = operation_index( + index_name, + Search::OperationIndexParams.new( + operation: Search::OperationType::COPY, + destination: tmp_index_name, + scope: [ + Search::ScopeType::SETTINGS, + Search::ScopeType::SYNONYMS, + Search::ScopeType::RULES + ] + ), + request_options + ) + + batch_responses = chunked_batch( + tmp_index_name, + objects, + Search::Action::ADD_OBJECT, + true, + batch_size, + request_options + ) + + wait_for_task(tmp_index_name, copy_operation_response.task_id) + + copy_operation_response = operation_index( + index_name, + Search::OperationIndexParams.new( + operation: Search::OperationType::COPY, + destination: tmp_index_name, + scope: [ + Search::ScopeType::SETTINGS, + Search::ScopeType::SYNONYMS, + Search::ScopeType::RULES + ] + ), + request_options + ) + + wait_for_task(tmp_index_name, copy_operation_response.task_id) + + move_operation_response = operation_index( + tmp_index_name, + Search::OperationIndexParams.new( + operation: Search::OperationType::MOVE, + destination: index_name + ), + request_options + ) + + wait_for_task(tmp_index_name, move_operation_response.task_id) + + Search::ReplaceAllObjectsResponse.new( + copy_operation_response: copy_operation_response, + batch_responses: batch_responses, + move_operation_response: move_operation_response + ) + end end end From b2b7cd235cdcd0ed8fec9b29e3e49e8f168c59d9 Mon Sep 17 00:00:00 2001 From: Pierre Millot Date: Tue, 18 Jun 2024 22:13:05 +0200 Subject: [PATCH 5/5] chore(deps): automatically update swiftformat (#3207) --- .github/actions/setup/action.yml | 10 ++++++++-- renovate.json | 12 ++++++++++++ scripts/docker/Dockerfile.swift | 3 ++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/.github/actions/setup/action.yml b/.github/actions/setup/action.yml index b7cb4319fd..0f90a31845 100644 --- a/.github/actions/setup/action.yml +++ b/.github/actions/setup/action.yml @@ -202,12 +202,18 @@ runs: with: swift-version: ${{ steps.versions.outputs.SWIFT_VERSION }} + - name: Set swiftformat version + if: ${{ inputs.language == 'swift' }} + id: swiftformat-version + shell: bash + run: echo "SWIFTFORMAT_VERSION=0.54.0" >> $GITHUB_OUTPUT + - name: Checkout swiftformat if: ${{ inputs.language == 'swift' }} uses: actions/checkout@v4 with: repository: nicklockwood/SwiftFormat - ref: 0.53.0 + ref: ${{ steps.swiftformat-version.outputs.SWIFTFORMAT_VERSION }} path: swiftformat - name: Cache the build folder @@ -216,7 +222,7 @@ runs: uses: actions/cache@v4 with: path: swiftformat/.build - key: swiftformat-build-0.53.0-${{ runner.os }} + key: swiftformat-build-${{ steps.swiftformat-version.outputs.SWIFTFORMAT_VERSION }}-${{ runner.os }} - name: Build swiftformat if: ${{ inputs.language == 'swift' && steps.cache-swiftformat.outputs.cache-hit != 'true' }} diff --git a/renovate.json b/renovate.json index e27f5b3c58..3b4ab6135d 100644 --- a/renovate.json +++ b/renovate.json @@ -171,6 +171,18 @@ ], "datasourceTemplate": "github-tags", "depNameTemplate": "php/php-src" + }, + { + "description": "Update swiftformat version", + "fileMatch": [ + "Dockerfile", + ".github/actions/setup/action.yml" + ], + "matchStrings": [ + "SWIFTFORMAT_VERSION=(?\\d+\\.\\d+\\.\\d+)" + ], + "depNameTemplate": "nicklockwood/SwiftFormat", + "datasourceTemplate": "github-releases" } ], "github-actions": { diff --git a/scripts/docker/Dockerfile.swift b/scripts/docker/Dockerfile.swift index 3fca7c4571..81b81abc3b 100644 --- a/scripts/docker/Dockerfile.swift +++ b/scripts/docker/Dockerfile.swift @@ -1,6 +1,7 @@ ARG SWIFT_VERSION +ARG SWIFTFORMAT_VERSION=0.54.0 -FROM ghcr.io/nicklockwood/swiftformat:latest as swiftFormat +FROM ghcr.io/nicklockwood/swiftformat:${SWIFTFORMAT_VERSION} AS swiftFormat FROM swift:${SWIFT_VERSION}-jammy COPY --from=swiftFormat /usr/bin/swiftformat /usr/bin/swiftformat