diff --git a/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt b/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt index 2ffa489f26..ac71e48077 100644 --- a/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt +++ b/clients/algoliasearch-client-kotlin/client/src/commonMain/kotlin/com/algolia/client/extensions/SearchClient.kt @@ -276,6 +276,49 @@ public suspend fun SearchClient.searchForFacets( ).results.map { it as SearchForFacetValuesResponse } } +/** + * Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests. + * + * @param indexName The index in which to perform the request. + * @param records The list of objects to index. + * @param serializer The serializer to use for the objects. + * @param action The action to perform on the objects. Default is `Action.AddObject`. + * @param waitForTask If true, wait for the task to complete. + * @param batchSize The size of the batch. Default is 1000. + * @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions. + * @return The list of responses from the batch requests. + * + */ +public suspend fun SearchClient.chunkedBatch( + indexName: String, + records: List, + serializer: KSerializer, + action: Action = Action.AddObject, + waitForTask: Boolean, + batchSize: Int = 1000, + requestOptions: RequestOptions? = null, +): List { + val tasks = mutableListOf() + records.chunked(batchSize).forEach { chunk -> + val requests = chunk.map { + BatchRequest( + action = action, + body = options.json.encodeToJsonElement(serializer, it).jsonObject + ) + } + val batch = batch( + indexName = indexName, + batchWriteParams = BatchWriteParams(requests), + requestOptions = requestOptions, + ) + tasks.add(batch) + } + if (waitForTask) { + tasks.forEach { waitTask(indexName, it.taskID) } + } + return tasks +} + /** * Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched. * Replace all objects in an index without any downtime. @@ -284,22 +327,19 @@ public suspend fun SearchClient.searchForFacets( * * See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details. * - * @param serializer [KSerializer] of type [T] for serialization. + * @param indexName The index in which to perform the request. * @param records The list of records to replace. - * @return intermediate operations (index name to task ID). + * @param serializer [KSerializer] of type [T] for serialization. + * @param batchSize The size of the batch. Default is 1000. + * @return responses from the three-step operations: copy, batch, move. */ public suspend fun SearchClient.replaceAllObjects( indexName: String, - serializer: KSerializer, records: List, + serializer: KSerializer, + batchSize: Int = 1000, requestOptions: RequestOptions?, -): List { - if (records.isEmpty()) return emptyList() - - val requests = records.map { record -> - val body = options.json.encodeToJsonElement(serializer, record).jsonObject - BatchRequest(action = Action.AddObject, body = body) - } +): ReplaceAllObjectsResponse { val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}" var copy = operationIndex( @@ -312,12 +352,16 @@ public suspend fun SearchClient.replaceAllObjects( requestOptions = requestOptions, ) - val batch = batch( + val batchResponses = this.chunkedBatch( indexName = tmpIndexName, - batchWriteParams = BatchWriteParams(requests), + records = records, + serializer = serializer, + action = Action.AddObject, + waitForTask = true, + batchSize = batchSize, requestOptions = requestOptions, ) - waitTask(indexName = tmpIndexName, taskID = batch.taskID) + waitTask(indexName = tmpIndexName, taskID = copy.taskID) copy = operationIndex( @@ -338,7 +382,7 @@ public suspend fun SearchClient.replaceAllObjects( ) waitTask(indexName = tmpIndexName, taskID = move.taskID) - return listOf(copy.taskID, batch.taskID, move.taskID) + return ReplaceAllObjectsResponse(copy, batchResponses, move) } /** diff --git a/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala b/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala index fb9335d296..71abc85ffb 100644 --- a/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala +++ b/clients/algoliasearch-client-scala/src/main/scala/algoliasearch/extension/package.scala @@ -193,6 +193,57 @@ package object extension { Future.successful(true) } + /** Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests. + * + * @param indexName + * The index in which to perform the request. + * @param records + * The list of records to replace. + * @param action + * The action to perform on the records. + * @param waitForTasks + * Whether to wait for the tasks to complete. + * @param batchSize + * The size of the batch. Default is 1000. + * @param requestOptions + * Additional request configuration. + * @return + * A future containing the response of the batch operations. + */ + def chunkedBatch( + indexName: String, + records: Seq[Any], + action: Action = Action.AddObject, + waitForTasks: Boolean, + batchSize: Int = 1000, + requestOptions: Option[RequestOptions] = None + )(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = { + var futures = Seq.empty[Future[BatchResponse]] + records.grouped(batchSize).foreach { chunk => + val requests = chunk.map { record => + BatchRequest(action = action, body = record) + } + val future = client.batch( + indexName = indexName, + batchWriteParams = BatchWriteParams(requests), + requestOptions = requestOptions + ) + futures = futures :+ future + } + + val responses = Future.sequence(futures) + + if (waitForTasks) { + responses.foreach { tasks => + tasks.foreach { task => + client.waitTask(indexName, task.taskID, requestOptions = requestOptions) + } + } + } + + responses + } + /** Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched. * Replace all objects in an index without any downtime. Internally, this method copies the existing index * settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the @@ -205,16 +256,19 @@ package object extension { * The index in which to perform the request. * @param records * The list of records to replace. + * @param batchSize + * The size of the batch. Default is 1000. + * @param requestOptions + * Additional request configuration. * @return - * intermediate operations (task IDs). + * A future containing the response of the three-step operations: copy, batch and move. */ def replaceAllObjects( indexName: String, records: Seq[Any], + batchSize: Int = 1000, requestOptions: Option[RequestOptions] = None - )(implicit ec: ExecutionContext): Future[Seq[Long]] = { - if (records.isEmpty) return Future.successful(Seq.empty) - + )(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = { val requests = records.map { record => BatchRequest(action = Action.AddObject, body = record) } @@ -231,12 +285,15 @@ package object extension { requestOptions = requestOptions ) - batch <- client.batch( + batchResponses <- chunkedBatch( indexName = tmpIndexName, - batchWriteParams = BatchWriteParams(requests), + records = records, + action = Action.AddObject, + waitForTasks = true, + batchSize = batchSize, requestOptions = requestOptions ) - _ <- client.waitTask(indexName = tmpIndexName, taskID = batch.taskID, requestOptions = requestOptions) + _ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions) copy <- client.operationIndex( @@ -250,13 +307,17 @@ package object extension { ) _ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions) - replace <- client.operationIndex( + move <- client.operationIndex( indexName = tmpIndexName, operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName), requestOptions = requestOptions ) - _ <- client.waitTask(indexName = tmpIndexName, taskID = replace.taskID, requestOptions = requestOptions) - } yield Seq(copy.taskID, batch.taskID, replace.taskID) + _ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions) + } yield ReplaceAllObjectsResponse( + copyOperationResponse = copy, + batchResponses = batchResponses, + moveOperationResponse = move + ) } } }