From fe73aa3cee24773c7f0d122791e75e7fe75cd66f Mon Sep 17 00:00:00 2001 From: Arpit-Bandejiya <109738717+Arpit-Bandejiya@users.noreply.github.com> Date: Thu, 27 Oct 2022 17:17:54 +0530 Subject: [PATCH] Fix for point in time (#207) * fix for point in time Signed-off-by: Arpit Bandejiya * remove extra file Signed-off-by: Arpit Bandejiya * fix unit tests Signed-off-by: Arpit Bandejiya * delete pit to be used without composite Signed-off-by: Arpit Bandejiya * changes for updates in low level client Signed-off-by: Arpit Bandejiya * Add tests Signed-off-by: Arpit Bandejiya * fixing tests Signed-off-by: Arpit Bandejiya * fix in delete pit Signed-off-by: Arpit Bandejiya * fix for integ tests Signed-off-by: Arpit Bandejiya * fix for integ tests Signed-off-by: Arpit Bandejiya Signed-off-by: Arpit Bandejiya --- .github/workflows/manual-integ.yml | 2 + osbenchmark/worker_coordinator/runner.py | 51 ++++++++++------ osbenchmark/workload/workload.py | 15 +++-- tests/worker_coordinator/runner_test.py | 75 ++++++++++++++++++++---- 4 files changed, 108 insertions(+), 35 deletions(-) diff --git a/.github/workflows/manual-integ.yml b/.github/workflows/manual-integ.yml index 788178f9a..e683e545f 100644 --- a/.github/workflows/manual-integ.yml +++ b/.github/workflows/manual-integ.yml @@ -13,6 +13,8 @@ jobs: uses: actions/checkout@v2 - name: Clone pyenv run: git clone https://github.com/pyenv/pyenv.git ~/.pyenv + - name: Clone pyenv-virtualenv + run: git clone https://github.com/pyenv/pyenv-virtualenv.git ~/.pyenv/plugins/pyenv-virtualenv - name: Install JDK 8 uses: actions/setup-java@v2 with: diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 1c8fff182..5baff2bf4 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -61,8 +61,9 @@ def register_default_runners(): register_runner(workload.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True) register_runner(workload.OperationType.GetAsyncSearch, Retry(GetAsyncSearch(), retry_until_success=True), async_runner=True) register_runner(workload.OperationType.DeleteAsyncSearch, DeleteAsyncSearch(), async_runner=True) - register_runner(workload.OperationType.OpenPointInTime, OpenPointInTime(), async_runner=True) - register_runner(workload.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True) + register_runner(workload.OperationType.CreatePointInTime, CreatePointInTime(), async_runner=True) + register_runner(workload.OperationType.DeletePointInTime, DeletePointInTime(), async_runner=True) + register_runner(workload.OperationType.ListAllPointInTime, ListAllPointInTime(), async_runner=True) # This is an administrative operation but there is no need for a retry here as we don't issue a request register_runner(workload.OperationType.Sleep, Sleep(), async_runner=True) @@ -1848,32 +1849,46 @@ def __repr__(self, *args, **kwargs): return "delete-async-search" -class OpenPointInTime(Runner): +class CreatePointInTime(Runner): async def __call__(self, opensearch, params): op_name = mandatory(params, "name", self) index = mandatory(params, "index", self) keep_alive = params.get("keep-alive", "1m") - response = await opensearch.open_point_in_time(index=index, - params=params.get("request-params"), - keep_alive=keep_alive) - id = response.get("id") + response = await opensearch.create_point_in_time(index=index, + params=params.get("request-params"), + keep_alive=keep_alive) + id = response.get("pit_id") CompositeContext.put(op_name, id) def __repr__(self, *args, **kwargs): - return "open-point-in-time" + return "create-point-in-time" -class ClosePointInTime(Runner): +class DeletePointInTime(Runner): async def __call__(self, opensearch, params): - pit_op = mandatory(params, "with-point-in-time-from", self) - pit_id = CompositeContext.get(pit_op) + pit_op = params.get("with-point-in-time-from", None) request_params = params.get("request-params", {}) - body = {"id": pit_id} - await opensearch.close_point_in_time(body=body, params=request_params, headers=None) - CompositeContext.remove(pit_op) + if pit_op is None: + await opensearch.delete_point_in_time(body=None, all=True, params=request_params, headers=None) + else: + pit_id = CompositeContext.get(pit_op) + body = { + "pit_id": [pit_id] + } + await opensearch.delete_point_in_time(body=body, params=request_params, headers=None) + CompositeContext.remove(pit_op) + + def __repr__(self, *args, **kwargs): + return "delete-point-in-time" + + +class ListAllPointInTime(Runner): + async def __call__(self, opensearch, params): + request_params = params.get("request-params", {}) + await opensearch.list_all_point_in_time(params=request_params, headers=None) def __repr__(self, *args, **kwargs): - return "close-point-in-time" + return "list-all-point-in-time" class CompositeContext: @@ -1925,9 +1940,11 @@ class Composite(Runner): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.supported_op_types = [ - "open-point-in-time", - "close-point-in-time", + "create-point-in-time", + "delete-point-in-time", + "list-all-point-in-time", "search", + "paginated-search", "raw-request", "sleep", "submit-async-search", diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 6f0941382..b854178d1 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -581,8 +581,9 @@ class OperationType(Enum): DeleteAsyncSearch = 11 PaginatedSearch = 12 ScrollSearch = 13 - OpenPointInTime = 14 - ClosePointInTime = 15 + CreatePointInTime = 14 + DeletePointInTime = 15 + ListAllPointInTime = 16 # administrative actions ForceMerge = 1001 @@ -703,10 +704,12 @@ def from_hyphenated_string(cls, v): return OperationType.GetAsyncSearch elif v == "delete-async-search": return OperationType.DeleteAsyncSearch - elif v == "open-point-in-time": - return OperationType.OpenPointInTime - elif v == "close-point-in-time": - return OperationType.ClosePointInTime + elif v == "create-point-in-time": + return OperationType.CreatePointInTime + elif v == "delete-point-in-time": + return OperationType.DeletePointInTime + elif v == "list-all-point-in-time": + return OperationType.ListAllPointInTime else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index ebc74fcda..0e8e6c8f6 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -4393,7 +4393,7 @@ async def test_delete_async_search(self, opensearch): ]) -class OpenPointInTimeTests(TestCase): +class CreatePointInTimeTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async async def test_creates_point_in_time(self, opensearch): @@ -4403,9 +4403,9 @@ async def test_creates_point_in_time(self, opensearch): "index": "test-index" } - opensearch.open_point_in_time.return_value = as_future({"id": pit_id}) + opensearch.create_point_in_time.return_value = as_future({"pit_id": pit_id}) - r = runner.OpenPointInTime() + r = runner.CreatePointInTime() async with runner.CompositeContext(): await r(opensearch, params) self.assertEqual(pit_id, runner.CompositeContext.get("open-pit-test")) @@ -4419,30 +4419,80 @@ async def test_can_only_be_run_in_composite(self, opensearch): "index": "test-index" } - opensearch.open_point_in_time.return_value = as_future({"id": pit_id}) + opensearch.create_point_in_time.return_value = as_future({"pit_id": pit_id}) - r = runner.OpenPointInTime() + r = runner.CreatePointInTime() with self.assertRaises(exceptions.BenchmarkAssertionError) as ctx: await r(opensearch, params) self.assertEqual("This operation is only allowed inside a composite operation.", ctx.exception.args[0]) -class ClosePointInTimeTests(TestCase): +class DeletePointInTimeTests(TestCase): @mock.patch("opensearchpy.OpenSearch") @run_async - async def test_closes_point_in_time(self, opensearch): + async def test_delete_point_in_time(self, opensearch): pit_id = "0123456789abcdef" params = { "name": "close-pit-test", "with-point-in-time-from": "open-pit-task1", } - opensearch.close_point_in_time.return_value=(as_future()) - r = runner.ClosePointInTime() + opensearch.delete_point_in_time.return_value=(as_future()) + r = runner.DeletePointInTime() async with runner.CompositeContext(): runner.CompositeContext.put("open-pit-task1", pit_id) await r(opensearch, params) - opensearch.close_point_in_time.assert_called_once_with(body={"id": "0123456789abcdef"}, params={}, headers=None) + opensearch.delete_point_in_time.assert_called_once_with(body={"pit_id": ["0123456789abcdef"]}, params={}, headers=None) + + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_delete_point_in_time_without_context(self, opensearch): + params = { + "name": "close-pit-test", + } + opensearch.delete_point_in_time.return_value=(as_future()) + r = runner.DeletePointInTime() + await r(opensearch, params) + opensearch.delete_point_in_time.assert_called_once_with(body=None, all=True, params={}, headers=None) + +class ListAllPointInTimeTests(TestCase): + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_get_all_point_in_time(self, opensearch): + pit_id = "0123456789abcdef" + params = {} + opensearch.list_all_point_in_time.return_value = as_future({ + "pits": [ + { + "pit_id": pit_id, + "keepAlive": 60000 + } + ] + }) + + r = runner.ListAllPointInTime() + await r(opensearch, params) + opensearch.list_all_point_in_time.assert_called_once() + + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_get_all_point_in_time_in_composite(self, opensearch): + pit_id = "0123456789abcdef" + params = {} + opensearch.list_all_point_in_time.return_value = as_future({ + "pits": [ + { + "pit_id": pit_id, + "keepAlive": 60000 + } + ] + }) + + r = runner.ListAllPointInTime() + async with runner.CompositeContext(): + await r(opensearch, params) + + opensearch.list_all_point_in_time.assert_called_once() class QueryWithSearchAfterScrollTests(TestCase): @@ -5145,8 +5195,9 @@ async def test_rejects_unsupported_operations(self, opensearch): with self.assertRaises(exceptions.BenchmarkAssertionError) as ctx: await r(opensearch, params) - self.assertEqual("Unsupported operation-type [bulk]. Use one of [open-point-in-time, close-point-in-time, " - "search, raw-request, sleep, submit-async-search, get-async-search, delete-async-search].", + self.assertEqual("Unsupported operation-type [bulk]. Use one of [create-point-in-time, delete-point-in-time," + " list-all-point-in-time, search, paginated-search, raw-request, sleep, submit-async-search," + " get-async-search, delete-async-search].", ctx.exception.args[0])