Skip to content

Commit

Permalink
Merge branch 'main' into fix/js-flaky
Browse files Browse the repository at this point in the history
  • Loading branch information
millotp authored Jun 18, 2024
2 parents a161ddf + b2b7cd2 commit 0e3992c
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 28 deletions.
10 changes: 8 additions & 2 deletions .github/actions/setup/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,18 @@ runs:
with:
swift-version: ${{ steps.versions.outputs.SWIFT_VERSION }}

- name: Set swiftformat version
if: ${{ inputs.language == 'swift' }}
id: swiftformat-version
shell: bash
run: echo "SWIFTFORMAT_VERSION=0.54.0" >> $GITHUB_OUTPUT

- name: Checkout swiftformat
if: ${{ inputs.language == 'swift' }}
uses: actions/checkout@v4
with:
repository: nicklockwood/SwiftFormat
ref: 0.53.0
ref: ${{ steps.swiftformat-version.outputs.SWIFTFORMAT_VERSION }}
path: swiftformat

- name: Cache the build folder
Expand All @@ -216,7 +222,7 @@ runs:
uses: actions/cache@v4
with:
path: swiftformat/.build
key: swiftformat-build-0.53.0-${{ runner.os }}
key: swiftformat-build-${{ steps.swiftformat-version.outputs.SWIFTFORMAT_VERSION }}-${{ runner.os }}

- name: Build swiftformat
if: ${{ inputs.language == 'swift' && steps.cache-swiftformat.outputs.cache-hit != 'true' }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,49 @@ public suspend fun SearchClient.searchForFacets(
).results.map { it as SearchForFacetValuesResponse }
}

/**
* Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests.
*
* @param indexName The index in which to perform the request.
* @param records The list of objects to index.
* @param serializer The serializer to use for the objects.
* @param action The action to perform on the objects. Default is `Action.AddObject`.
* @param waitForTask If true, wait for the task to complete.
* @param batchSize The size of the batch. Default is 1000.
* @param requestOptions The requestOptions to send along with the query, they will be merged with the transporter requestOptions.
* @return The list of responses from the batch requests.
*
*/
public suspend fun <T> SearchClient.chunkedBatch(
indexName: String,
records: List<T>,
serializer: KSerializer<T>,
action: Action = Action.AddObject,
waitForTask: Boolean,
batchSize: Int = 1000,
requestOptions: RequestOptions? = null,
): List<BatchResponse> {
val tasks = mutableListOf<BatchResponse>()
records.chunked(batchSize).forEach { chunk ->
val requests = chunk.map {
BatchRequest(
action = action,
body = options.json.encodeToJsonElement(serializer, it).jsonObject,
)
}
val batch = batch(
indexName = indexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions,
)
tasks.add(batch)
}
if (waitForTask) {
tasks.forEach { waitTask(indexName, it.taskID) }
}
return tasks
}

/**
* Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
* Replace all objects in an index without any downtime.
Expand All @@ -284,22 +327,19 @@ public suspend fun SearchClient.searchForFacets(
*
* See https://api-clients-automation.netlify.app/docs/contributing/add-new-api-client#5-helpers for implementation details.
*
* @param serializer [KSerializer] of type [T] for serialization.
* @param indexName The index in which to perform the request.
* @param records The list of records to replace.
* @return intermediate operations (index name to task ID).
* @param serializer [KSerializer] of type [T] for serialization.
* @param batchSize The size of the batch. Default is 1000.
* @return responses from the three-step operations: copy, batch, move.
*/
public suspend fun <T> SearchClient.replaceAllObjects(
indexName: String,
serializer: KSerializer<T>,
records: List<T>,
serializer: KSerializer<T>,
batchSize: Int = 1000,
requestOptions: RequestOptions?,
): List<Long> {
if (records.isEmpty()) return emptyList()

val requests = records.map { record ->
val body = options.json.encodeToJsonElement(serializer, record).jsonObject
BatchRequest(action = Action.AddObject, body = body)
}
): ReplaceAllObjectsResponse {
val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"

var copy = operationIndex(
Expand All @@ -312,12 +352,16 @@ public suspend fun <T> SearchClient.replaceAllObjects(
requestOptions = requestOptions,
)

val batch = batch(
val batchResponses = this.chunkedBatch(
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
records = records,
serializer = serializer,
action = Action.AddObject,
waitForTask = true,
batchSize = batchSize,
requestOptions = requestOptions,
)
waitTask(indexName = tmpIndexName, taskID = batch.taskID)

waitTask(indexName = tmpIndexName, taskID = copy.taskID)

copy = operationIndex(
Expand All @@ -338,7 +382,7 @@ public suspend fun <T> SearchClient.replaceAllObjects(
)
waitTask(indexName = tmpIndexName, taskID = move.taskID)

return listOf(copy.taskID, batch.taskID, move.taskID)
return ReplaceAllObjectsResponse(copy, batchResponses, move)
}

/**
Expand Down
100 changes: 100 additions & 0 deletions clients/algoliasearch-client-ruby/lib/algolia/api/search_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3177,5 +3177,105 @@ def get_secured_api_key_remaining_validity(secured_api_key)

valid_until - now
end

# Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `batch` requests.
#
# @param index_name [String] the `index_name` where the operation will be performed.
# @param objects [Array] The array of `objects` to store in the given Algolia `index_name`.
# @param action [Action] The `batch` `action` to perform on the given array of `objects`, defaults to `addObject`.
# @param wait_for_tasks [Boolean] Whether or not we should wait until every `batch` tasks has been processed, this operation may slow the total execution time of this method but is more reliable.
# @param batch_size [int] The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
# @param request_options: The request options to send along with the query, they will be merged with the transporter base parameters (headers, query params, timeouts, etc.). (optional)
#
# @return [Array<BatchResponse>]
#
def chunked_batch(index_name, objects, action = Action::ADD_OBJECT, wait_for_tasks = false, batch_size = 1000, request_options = {})
responses = []
objects.each_slice(batch_size) do |chunk|
requests = chunk.map do |object|
Search::BatchRequest.new(action: action, body: object)
end

responses.append(batch(index_name, Search::BatchWriteParams.new(requests: requests), request_options))
end

if wait_for_tasks
responses.each do |response|
wait_for_task(index_name, response.task_id)
end
end

responses
end

# Helper: Replaces all objects (records) in the given `index_name` with the given `objects`. A temporary index is created during this process in order to backup your data.
#
# @param index_name [String] The `index_name` to replace `objects` in.
# @param objects [Array] The array of `objects` to store in the given Algolia `index_name`.
# @param batch_size [int] The size of the chunk of `objects`. The number of `batch` calls will be equal to `length(objects) / batchSize`. Defaults to 1000.
# @param request_options: The request options to send along with the query, they will be merged with the transporter base parameters (headers, query params, timeouts, etc.). (optional)
#
# @return [Array<ReplaceAllObjectsResponse>]
def replace_all_objects(index_name, objects, batch_size = 1000, request_options = {})
tmp_index_name = index_name + '_tmp_' + rand(10_000_000).to_s

copy_operation_response = operation_index(
index_name,
Search::OperationIndexParams.new(
operation: Search::OperationType::COPY,
destination: tmp_index_name,
scope: [
Search::ScopeType::SETTINGS,
Search::ScopeType::SYNONYMS,
Search::ScopeType::RULES
]
),
request_options
)

batch_responses = chunked_batch(
tmp_index_name,
objects,
Search::Action::ADD_OBJECT,
true,
batch_size,
request_options
)

wait_for_task(tmp_index_name, copy_operation_response.task_id)

copy_operation_response = operation_index(
index_name,
Search::OperationIndexParams.new(
operation: Search::OperationType::COPY,
destination: tmp_index_name,
scope: [
Search::ScopeType::SETTINGS,
Search::ScopeType::SYNONYMS,
Search::ScopeType::RULES
]
),
request_options
)

wait_for_task(tmp_index_name, copy_operation_response.task_id)

move_operation_response = operation_index(
tmp_index_name,
Search::OperationIndexParams.new(
operation: Search::OperationType::MOVE,
destination: index_name
),
request_options
)

wait_for_task(tmp_index_name, move_operation_response.task_id)

Search::ReplaceAllObjectsResponse.new(
copy_operation_response: copy_operation_response,
batch_responses: batch_responses,
move_operation_response: move_operation_response
)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,57 @@ package object extension {
Future.successful(true)
}

/** Helper: Chunks the given `objects` list in subset of 1000 elements max to make it fit in `batch` requests.
*
* @param indexName
* The index in which to perform the request.
* @param records
* The list of records to replace.
* @param action
* The action to perform on the records.
* @param waitForTasks
* Whether to wait for the tasks to complete.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
* A future containing the response of the batch operations.
*/
def chunkedBatch(
indexName: String,
records: Seq[Any],
action: Action = Action.AddObject,
waitForTasks: Boolean,
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[BatchResponse]] = {
var futures = Seq.empty[Future[BatchResponse]]
records.grouped(batchSize).foreach { chunk =>
val requests = chunk.map { record =>
BatchRequest(action = action, body = record)
}
val future = client.batch(
indexName = indexName,
batchWriteParams = BatchWriteParams(requests),
requestOptions = requestOptions
)
futures = futures :+ future
}

val responses = Future.sequence(futures)

if (waitForTasks) {
responses.foreach { tasks =>
tasks.foreach { task =>
client.waitTask(indexName, task.taskID, requestOptions = requestOptions)
}
}
}

responses
}

/** Push a new set of objects and remove all previous ones. Settings, synonyms and query rules are untouched.
* Replace all objects in an index without any downtime. Internally, this method copies the existing index
* settings, synonyms and query rules and indexes all passed objects. Finally, the temporary one replaces the
Expand All @@ -205,16 +256,19 @@ package object extension {
* The index in which to perform the request.
* @param records
* The list of records to replace.
* @param batchSize
* The size of the batch. Default is 1000.
* @param requestOptions
* Additional request configuration.
* @return
* intermediate operations (task IDs).
* A future containing the response of the three-step operations: copy, batch and move.
*/
def replaceAllObjects(
indexName: String,
records: Seq[Any],
batchSize: Int = 1000,
requestOptions: Option[RequestOptions] = None
)(implicit ec: ExecutionContext): Future[Seq[Long]] = {
if (records.isEmpty) return Future.successful(Seq.empty)

)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val requests = records.map { record =>
BatchRequest(action = Action.AddObject, body = record)
}
Expand All @@ -231,12 +285,15 @@ package object extension {
requestOptions = requestOptions
)

batch <- client.batch(
batchResponses <- chunkedBatch(
indexName = tmpIndexName,
batchWriteParams = BatchWriteParams(requests),
records = records,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = batch.taskID, requestOptions = requestOptions)

_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

copy <- client.operationIndex(
Expand All @@ -250,13 +307,17 @@ package object extension {
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

replace <- client.operationIndex(
move <- client.operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = replace.taskID, requestOptions = requestOptions)
} yield Seq(copy.taskID, batch.taskID, replace.taskID)
_ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions)
} yield ReplaceAllObjectsResponse(
copyOperationResponse = copy,
batchResponses = batchResponses,
moveOperationResponse = move
)
}
}
}
2 changes: 1 addition & 1 deletion playground/ruby/Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: ../../clients/algoliasearch-client-ruby
specs:
algolia (3.0.0.beta.2)
algolia (3.0.0.beta.4)
faraday (>= 1.0.1, < 3.0)
faraday-net_http_persistent (>= 0.15, < 3)
net-http-persistent
Expand Down
12 changes: 12 additions & 0 deletions renovate.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@
],
"datasourceTemplate": "github-tags",
"depNameTemplate": "php/php-src"
},
{
"description": "Update swiftformat version",
"fileMatch": [
"Dockerfile",
".github/actions/setup/action.yml"
],
"matchStrings": [
"SWIFTFORMAT_VERSION=(?<currentValue>\\d+\\.\\d+\\.\\d+)"
],
"depNameTemplate": "nicklockwood/SwiftFormat",
"datasourceTemplate": "github-releases"
}
],
"github-actions": {
Expand Down
Loading

0 comments on commit 0e3992c

Please sign in to comment.