Skip to content

Commit

Permalink
Merge branch 'main' into feat/DI-1832/chunkedbatch-php
Browse files Browse the repository at this point in the history
  • Loading branch information
damcou authored Feb 27, 2024
2 parents 4561b7d + 28e583b commit 0e55195
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/.cache_version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.17
1.0.18
35 changes: 19 additions & 16 deletions clients/algoliasearch-client-python/algoliasearch/search/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ async def chunked_batch(
objects: List[Dict[str, Any]],
action: Action = "addObject",
wait_for_tasks: bool = False,
batch_size: int = 1000,
request_options: Optional[Union[dict, RequestOptions]] = None,
) -> List[BatchResponse]:
"""
Expand All @@ -426,7 +427,7 @@ async def chunked_batch(
responses: List[BatchResponse] = []
for i, obj in enumerate(objects):
requests.append(BatchRequest(action=action, body=obj))
if i % 1000 == 0:
if i % batch_size == 0:
responses.append(
await self.batch(
index_name=index_name,
Expand All @@ -446,14 +447,15 @@ async def replace_all_objects(
self,
index_name: str,
objects: List[Dict[str, Any]],
batch_size: int = 1000,
request_options: Optional[Union[dict, RequestOptions]] = None,
) -> List[ApiResponse[str]]:
"""
Helper: Replaces all objects (records) in the given `index_name` with the given `objects`. A temporary index is created during this process in order to backup your data.
"""
tmp_index_name = self.create_temporary_name(index_name)
responses: List[ApiResponse[str]] = []
copy_resp = await self.operation_index(

copy_operation_response = await self.operation_index(
index_name=index_name,
operation_index_params=OperationIndexParams(
operation="copy",
Expand All @@ -466,34 +468,35 @@ async def replace_all_objects(
),
request_options=request_options,
)
await self.wait_for_task(
index_name=index_name, task_id=copy_operation_response.task_id
)

responses.append(copy_resp)

await self.wait_for_task(index_name=index_name, task_id=copy_resp.task_id)

save_resps = await self.chunked_batch(
batch_responses = await self.chunked_batch(
index_name=tmp_index_name,
objects=objects,
wait_for_tasks=True,
batch_size=batch_size,
request_options=request_options,
)

responses += save_resps

move_resp = await self.operation_index(
move_operation_response = await self.operation_index(
index_name=tmp_index_name,
operation_index_params=OperationIndexParams(
operation="move",
destination=index_name,
),
request_options=request_options,
)
await self.wait_for_task(
index_name=tmp_index_name, task_id=move_operation_response.task_id
)

responses.append(move_resp)

await self.wait_for_task(index_name=tmp_index_name, task_id=move_resp.task_id)

return responses
return {
"copy_operation_response": copy_operation_response,
"batch_responses": batch_responses,
"move_operation_response": move_operation_response,
}

async def add_api_key_with_http_info(
self,
Expand Down
82 changes: 47 additions & 35 deletions templates/python/search_helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
objects: List[Dict[str, Any]],
action: Action = "addObject",
wait_for_tasks: bool = False,
batch_size: int = 1000,
request_options: Optional[Union[dict, RequestOptions]] = None,
) -> List[BatchResponse]:
"""
Expand All @@ -222,7 +223,7 @@
responses: List[BatchResponse] = []
for i, obj in enumerate(objects):
requests.append(BatchRequest(action=action, body=obj))
if i % 1000 == 0:
if i % batch_size == 0:
responses.append(
await self.batch(
index_name=index_name,
Expand All @@ -238,46 +239,57 @@
)
return responses

async def replace_all_objects(self, index_name: str, objects: List[Dict[str, Any]], request_options: Optional[Union[dict, RequestOptions]] = None) -> List[ApiResponse[str]]:
async def replace_all_objects(
self,
index_name: str,
objects: List[Dict[str, Any]],
batch_size: int = 1000,
request_options: Optional[Union[dict, RequestOptions]] = None,
) -> List[ApiResponse[str]]:
"""
Helper: Replaces all objects (records) in the given `index_name` with the given `objects`. A temporary index is created during this process in order to backup your data.
"""
tmp_index_name = self.create_temporary_name(index_name)
responses: List[ApiResponse[str]] = []
copy_resp = await self.operation_index(
index_name=index_name,
operation_index_params=OperationIndexParams(
operation="copy",
destination=tmp_index_name,
scope=[ScopeType("settings"), ScopeType("synonyms"), ScopeType("rules")]
),
request_options=request_options,
)

responses.append(copy_resp)

await self.wait_for_task(index_name=index_name, task_id=copy_resp.task_id)

save_resps = await self.chunked_batch(
index_name=tmp_index_name,
objects=objects,
wait_for_tasks=True,
request_options=request_options,
copy_operation_response = await self.operation_index(
index_name=index_name,
operation_index_params=OperationIndexParams(
operation="copy",
destination=tmp_index_name,
scope=[
ScopeType("settings"),
ScopeType("synonyms"),
ScopeType("rules"),
],
),
request_options=request_options,
)
await self.wait_for_task(
index_name=index_name, task_id=copy_operation_response.task_id
)

responses += save_resps

move_resp = await self.operation_index(
index_name=tmp_index_name,
operation_index_params=OperationIndexParams(
operation="move",
destination=index_name,
),
request_options=request_options,
)

responses.append(move_resp)
batch_responses = await self.chunked_batch(
index_name=tmp_index_name,
objects=objects,
wait_for_tasks=True,
batch_size=batch_size,
request_options=request_options,
)

await self.wait_for_task(index_name=tmp_index_name, task_id=move_resp.task_id)
move_operation_response = await self.operation_index(
index_name=tmp_index_name,
operation_index_params=OperationIndexParams(
operation="move",
destination=index_name,
),
request_options=request_options,
)
await self.wait_for_task(
index_name=tmp_index_name, task_id=move_operation_response.task_id
)

return responses
return {
"copy_operation_response": copy_operation_response,
"batch_responses": batch_responses,
"move_operation_response": move_operation_response,
}
112 changes: 111 additions & 1 deletion templates/python/tests/requests/helpers.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,114 @@ def test_generate_secured_api_key_0(self):
self._client.get_secured_api_key_remaining_validity("foo")
assert False
except Exception as e:
assert str(e) == "Incorrect padding"
assert str(e) == "Incorrect padding"

async def test_replace_all_objects_0(self):
"""
executes with minimal parameters
"""
self._client.batch = AsyncMock(
return_value=BatchResponse(task_id=42, object_ids=["foo", "bar"])
)
self._client.operation_index = AsyncMock(
return_value=UpdatedAtResponse(task_id=21, updated_at="foobar")
)
self._client.get_task = AsyncMock(
return_value=GetTaskResponse(status="published")
)
_resp = await self._client.replace_all_objects(
index_name="foo", objects=[{"name": "John Doe"}]
)
self._client.operation_index.assert_called()
self._client.batch.assert_called()
self._client.operation_index.assert_called()
assert _resp == {
"batch_responses": [BatchResponse(task_id=42, object_ids=["foo", "bar"])],
"copy_operation_response": UpdatedAtResponse(
task_id=21, updated_at="foobar"
),
"move_operation_response": UpdatedAtResponse(
task_id=21, updated_at="foobar"
),
}

async def test_replace_all_objects_1(self):
"""
does many calls when len(objects) > batchSize
"""
self._client.batch = AsyncMock(
return_value=BatchResponse(task_id=42, object_ids=["foo", "bar"])
)
self._client.operation_index = AsyncMock(
return_value=UpdatedAtResponse(task_id=21, updated_at="foobar")
)
self._client.get_task = AsyncMock(
return_value=GetTaskResponse(status="published")
)
_resp = await self._client.replace_all_objects(
index_name="foo",
objects=[
{
"name": f"John Doe{i}",
"objectID": f"fff2bd4d-bb17-4e21-a0c4-0a8ea5e363f2{i}",
}
for i in range(33)
],
batch_size=10,
)
self._client.operation_index.assert_called()
self._client.batch.assert_called()
self._client.operation_index.assert_called()
assert _resp == {
"batch_responses": [
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
],
"copy_operation_response": UpdatedAtResponse(
task_id=21, updated_at="foobar"
),
"move_operation_response": UpdatedAtResponse(
task_id=21, updated_at="foobar"
),
}

async def test_replace_all_objects_2(self):
"""
batchSize is 1000 by default
"""
self._client.batch = AsyncMock(
return_value=BatchResponse(task_id=42, object_ids=["foo", "bar"])
)
self._client.operation_index = AsyncMock(
return_value=UpdatedAtResponse(task_id=21, updated_at="foobar")
)
self._client.get_task = AsyncMock(
return_value=GetTaskResponse(status="published")
)
_resp = await self._client.replace_all_objects(
index_name="foo",
objects=[
{
"name": f"John Doe{i}",
"objectID": f"fff2bd4d-bb17-4e21-a0c4-0a8ea5e363f2{i}",
}
for i in range(1001)
],
)
self._client.operation_index.assert_called()
self._client.batch.assert_called()
self._client.operation_index.assert_called()
assert _resp == {
"batch_responses": [
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
BatchResponse(task_id=42, object_ids=["foo", "bar"]),
],
"copy_operation_response": UpdatedAtResponse(
task_id=21, updated_at="foobar"
),
"move_operation_response": UpdatedAtResponse(
task_id=21, updated_at="foobar"
),
}
4 changes: 4 additions & 0 deletions templates/python/tests/requests/requests.mustache
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from time import time
from os import environ
from json import loads
from unittest.mock import AsyncMock
from algoliasearch.http.transporter import EchoTransporter
from algoliasearch.http.helpers import SecuredApiKeyRestrictions
from algoliasearch.{{{import}}}.client import {{#lambda.pascalcase}}{{{client}}}{{/lambda.pascalcase}}
from algoliasearch.{{{import}}}.config import {{#lambda.pascalcase}}{{clientPrefix}}Config{{/lambda.pascalcase}}
from algoliasearch.search.models.batch_response import BatchResponse
from algoliasearch.search.models.updated_at_response import UpdatedAtResponse
from algoliasearch.search.models.get_task_response import GetTaskResponse
{{#hasE2E}}
from ..helpers import Helpers
from dotenv import load_dotenv
Expand Down
Loading

0 comments on commit 0e55195

Please sign in to comment.