Skip to content

Commit

Permalink
Fix for point in time (#207)
Browse files Browse the repository at this point in the history
* fix for point in time

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* remove extra file

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* fix unit tests

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* delete pit to be used without composite

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* changes for updates in low level client

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* Add tests

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* fixing tests

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* fix in delete pit

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* fix for integ tests

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

* fix for integ tests

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya authored Oct 27, 2022
1 parent 589bcfd commit fe73aa3
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 35 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/manual-integ.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ jobs:
uses: actions/checkout@v2
- name: Clone pyenv
run: git clone https://github.com/pyenv/pyenv.git ~/.pyenv
- name: Clone pyenv-virtualenv
run: git clone https://github.com/pyenv/pyenv-virtualenv.git ~/.pyenv/plugins/pyenv-virtualenv
- name: Install JDK 8
uses: actions/setup-java@v2
with:
Expand Down
51 changes: 34 additions & 17 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ def register_default_runners():
register_runner(workload.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True)
register_runner(workload.OperationType.GetAsyncSearch, Retry(GetAsyncSearch(), retry_until_success=True), async_runner=True)
register_runner(workload.OperationType.DeleteAsyncSearch, DeleteAsyncSearch(), async_runner=True)
register_runner(workload.OperationType.OpenPointInTime, OpenPointInTime(), async_runner=True)
register_runner(workload.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True)
register_runner(workload.OperationType.CreatePointInTime, CreatePointInTime(), async_runner=True)
register_runner(workload.OperationType.DeletePointInTime, DeletePointInTime(), async_runner=True)
register_runner(workload.OperationType.ListAllPointInTime, ListAllPointInTime(), async_runner=True)

# This is an administrative operation but there is no need for a retry here as we don't issue a request
register_runner(workload.OperationType.Sleep, Sleep(), async_runner=True)
Expand Down Expand Up @@ -1848,32 +1849,46 @@ def __repr__(self, *args, **kwargs):
return "delete-async-search"


class OpenPointInTime(Runner):
class CreatePointInTime(Runner):
async def __call__(self, opensearch, params):
op_name = mandatory(params, "name", self)
index = mandatory(params, "index", self)
keep_alive = params.get("keep-alive", "1m")
response = await opensearch.open_point_in_time(index=index,
params=params.get("request-params"),
keep_alive=keep_alive)
id = response.get("id")
response = await opensearch.create_point_in_time(index=index,
params=params.get("request-params"),
keep_alive=keep_alive)
id = response.get("pit_id")
CompositeContext.put(op_name, id)

def __repr__(self, *args, **kwargs):
return "open-point-in-time"
return "create-point-in-time"


class ClosePointInTime(Runner):
class DeletePointInTime(Runner):
async def __call__(self, opensearch, params):
pit_op = mandatory(params, "with-point-in-time-from", self)
pit_id = CompositeContext.get(pit_op)
pit_op = params.get("with-point-in-time-from", None)
request_params = params.get("request-params", {})
body = {"id": pit_id}
await opensearch.close_point_in_time(body=body, params=request_params, headers=None)
CompositeContext.remove(pit_op)
if pit_op is None:
await opensearch.delete_point_in_time(body=None, all=True, params=request_params, headers=None)
else:
pit_id = CompositeContext.get(pit_op)
body = {
"pit_id": [pit_id]
}
await opensearch.delete_point_in_time(body=body, params=request_params, headers=None)
CompositeContext.remove(pit_op)

def __repr__(self, *args, **kwargs):
return "delete-point-in-time"


class ListAllPointInTime(Runner):
async def __call__(self, opensearch, params):
request_params = params.get("request-params", {})
await opensearch.list_all_point_in_time(params=request_params, headers=None)

def __repr__(self, *args, **kwargs):
return "close-point-in-time"
return "list-all-point-in-time"


class CompositeContext:
Expand Down Expand Up @@ -1925,9 +1940,11 @@ class Composite(Runner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.supported_op_types = [
"open-point-in-time",
"close-point-in-time",
"create-point-in-time",
"delete-point-in-time",
"list-all-point-in-time",
"search",
"paginated-search",
"raw-request",
"sleep",
"submit-async-search",
Expand Down
15 changes: 9 additions & 6 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,9 @@ class OperationType(Enum):
DeleteAsyncSearch = 11
PaginatedSearch = 12
ScrollSearch = 13
OpenPointInTime = 14
ClosePointInTime = 15
CreatePointInTime = 14
DeletePointInTime = 15
ListAllPointInTime = 16

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -703,10 +704,12 @@ def from_hyphenated_string(cls, v):
return OperationType.GetAsyncSearch
elif v == "delete-async-search":
return OperationType.DeleteAsyncSearch
elif v == "open-point-in-time":
return OperationType.OpenPointInTime
elif v == "close-point-in-time":
return OperationType.ClosePointInTime
elif v == "create-point-in-time":
return OperationType.CreatePointInTime
elif v == "delete-point-in-time":
return OperationType.DeletePointInTime
elif v == "list-all-point-in-time":
return OperationType.ListAllPointInTime
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
75 changes: 63 additions & 12 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4393,7 +4393,7 @@ async def test_delete_async_search(self, opensearch):
])


class OpenPointInTimeTests(TestCase):
class CreatePointInTimeTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_creates_point_in_time(self, opensearch):
Expand All @@ -4403,9 +4403,9 @@ async def test_creates_point_in_time(self, opensearch):
"index": "test-index"
}

opensearch.open_point_in_time.return_value = as_future({"id": pit_id})
opensearch.create_point_in_time.return_value = as_future({"pit_id": pit_id})

r = runner.OpenPointInTime()
r = runner.CreatePointInTime()
async with runner.CompositeContext():
await r(opensearch, params)
self.assertEqual(pit_id, runner.CompositeContext.get("open-pit-test"))
Expand All @@ -4419,30 +4419,80 @@ async def test_can_only_be_run_in_composite(self, opensearch):
"index": "test-index"
}

opensearch.open_point_in_time.return_value = as_future({"id": pit_id})
opensearch.create_point_in_time.return_value = as_future({"pit_id": pit_id})

r = runner.OpenPointInTime()
r = runner.CreatePointInTime()
with self.assertRaises(exceptions.BenchmarkAssertionError) as ctx:
await r(opensearch, params)

self.assertEqual("This operation is only allowed inside a composite operation.", ctx.exception.args[0])

class ClosePointInTimeTests(TestCase):
class DeletePointInTimeTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_closes_point_in_time(self, opensearch):
async def test_delete_point_in_time(self, opensearch):
pit_id = "0123456789abcdef"
params = {
"name": "close-pit-test",
"with-point-in-time-from": "open-pit-task1",
}
opensearch.close_point_in_time.return_value=(as_future())
r = runner.ClosePointInTime()
opensearch.delete_point_in_time.return_value=(as_future())
r = runner.DeletePointInTime()
async with runner.CompositeContext():
runner.CompositeContext.put("open-pit-task1", pit_id)
await r(opensearch, params)

opensearch.close_point_in_time.assert_called_once_with(body={"id": "0123456789abcdef"}, params={}, headers=None)
opensearch.delete_point_in_time.assert_called_once_with(body={"pit_id": ["0123456789abcdef"]}, params={}, headers=None)

@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_delete_point_in_time_without_context(self, opensearch):
params = {
"name": "close-pit-test",
}
opensearch.delete_point_in_time.return_value=(as_future())
r = runner.DeletePointInTime()
await r(opensearch, params)
opensearch.delete_point_in_time.assert_called_once_with(body=None, all=True, params={}, headers=None)

class ListAllPointInTimeTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_get_all_point_in_time(self, opensearch):
pit_id = "0123456789abcdef"
params = {}
opensearch.list_all_point_in_time.return_value = as_future({
"pits": [
{
"pit_id": pit_id,
"keepAlive": 60000
}
]
})

r = runner.ListAllPointInTime()
await r(opensearch, params)
opensearch.list_all_point_in_time.assert_called_once()

@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_get_all_point_in_time_in_composite(self, opensearch):
pit_id = "0123456789abcdef"
params = {}
opensearch.list_all_point_in_time.return_value = as_future({
"pits": [
{
"pit_id": pit_id,
"keepAlive": 60000
}
]
})

r = runner.ListAllPointInTime()
async with runner.CompositeContext():
await r(opensearch, params)

opensearch.list_all_point_in_time.assert_called_once()


class QueryWithSearchAfterScrollTests(TestCase):
Expand Down Expand Up @@ -5145,8 +5195,9 @@ async def test_rejects_unsupported_operations(self, opensearch):
with self.assertRaises(exceptions.BenchmarkAssertionError) as ctx:
await r(opensearch, params)

self.assertEqual("Unsupported operation-type [bulk]. Use one of [open-point-in-time, close-point-in-time, "
"search, raw-request, sleep, submit-async-search, get-async-search, delete-async-search].",
self.assertEqual("Unsupported operation-type [bulk]. Use one of [create-point-in-time, delete-point-in-time,"
" list-all-point-in-time, search, paginated-search, raw-request, sleep, submit-async-search,"
" get-async-search, delete-async-search].",
ctx.exception.args[0])


Expand Down

0 comments on commit fe73aa3

Please sign in to comment.