Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for point in time #207

Merged
merged 10 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions osbenchmark/worker_coordinator/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ def register_default_runners():
register_runner(workload.OperationType.SubmitAsyncSearch, SubmitAsyncSearch(), async_runner=True)
register_runner(workload.OperationType.GetAsyncSearch, Retry(GetAsyncSearch(), retry_until_success=True), async_runner=True)
register_runner(workload.OperationType.DeleteAsyncSearch, DeleteAsyncSearch(), async_runner=True)
register_runner(workload.OperationType.OpenPointInTime, OpenPointInTime(), async_runner=True)
register_runner(workload.OperationType.ClosePointInTime, ClosePointInTime(), async_runner=True)
Comment on lines -64 to -65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we confirm none of the workloads here do not use these operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have checked and verified that these operations are not called in the workloads repo as of now

register_runner(workload.OperationType.CreatePointInTime, CreatePointInTime(), async_runner=True)
register_runner(workload.OperationType.DeletePointInTime, DeletePointInTime(), async_runner=True)
register_runner(workload.OperationType.ListAllPointInTime, ListAllPointInTime(), async_runner=True)

# This is an administrative operation but there is no need for a retry here as we don't issue a request
register_runner(workload.OperationType.Sleep, Sleep(), async_runner=True)
Expand Down Expand Up @@ -1848,32 +1849,46 @@ def __repr__(self, *args, **kwargs):
return "delete-async-search"


class OpenPointInTime(Runner):
class CreatePointInTime(Runner):
async def __call__(self, opensearch, params):
op_name = mandatory(params, "name", self)
index = mandatory(params, "index", self)
keep_alive = params.get("keep-alive", "1m")
response = await opensearch.open_point_in_time(index=index,
params=params.get("request-params"),
keep_alive=keep_alive)
id = response.get("id")
response = await opensearch.create_point_in_time(index=index,
params=params.get("request-params"),
keep_alive=keep_alive)
id = response.get("pit_id")
CompositeContext.put(op_name, id)

def __repr__(self, *args, **kwargs):
return "open-point-in-time"
return "create-point-in-time"


class ClosePointInTime(Runner):
class DeletePointInTime(Runner):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember us discussing to have deleteAllPointInTime outside composite context - lets see if we can add that as well to code repository.

async def __call__(self, opensearch, params):
pit_op = mandatory(params, "with-point-in-time-from", self)
pit_id = CompositeContext.get(pit_op)
pit_op = params.get("with-point-in-time-from", None)
request_params = params.get("request-params", {})
body = {"id": pit_id}
await opensearch.close_point_in_time(body=body, params=request_params, headers=None)
CompositeContext.remove(pit_op)
if pit_op is None:
await opensearch.delete_point_in_time(all=True, params=request_params)
else:
pit_id = CompositeContext.get(pit_op)
body = {
"pit_id": [pit_id]
}
await opensearch.delete_point_in_time(body=body, params=request_params, headers=None)
CompositeContext.remove(pit_op)

def __repr__(self, *args, **kwargs):
return "delete-point-in-time"


class ListAllPointInTime(Runner):
async def __call__(self, opensearch, params):
request_params = params.get("request-params", {})
await opensearch.list_all_point_in_time(params=request_params, headers=None)

def __repr__(self, *args, **kwargs):
return "close-point-in-time"
return "list-all-point-in-time"


class CompositeContext:
Expand Down Expand Up @@ -1925,9 +1940,11 @@ class Composite(Runner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.supported_op_types = [
"open-point-in-time",
"close-point-in-time",
"create-point-in-time",
"delete-point-in-time",
"list-all-point-in-time",
"search",
"paginated-search",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where it the code for this ?

Copy link
Contributor Author

@Arpit-Bandejiya Arpit-Bandejiya Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code for paginated-search functionality is already present in the code base, though the paginated-search was not enabled for CompositeContext.

"raw-request",
"sleep",
"submit-async-search",
Expand Down
15 changes: 9 additions & 6 deletions osbenchmark/workload/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,9 @@ class OperationType(Enum):
DeleteAsyncSearch = 11
PaginatedSearch = 12
ScrollSearch = 13
OpenPointInTime = 14
ClosePointInTime = 15
CreatePointInTime = 14
DeletePointInTime = 15
ListAllPointInTime = 16

# administrative actions
ForceMerge = 1001
Expand Down Expand Up @@ -703,10 +704,12 @@ def from_hyphenated_string(cls, v):
return OperationType.GetAsyncSearch
elif v == "delete-async-search":
return OperationType.DeleteAsyncSearch
elif v == "open-point-in-time":
return OperationType.OpenPointInTime
elif v == "close-point-in-time":
return OperationType.ClosePointInTime
elif v == "create-point-in-time":
return OperationType.CreatePointInTime
elif v == "delete-point-in-time":
return OperationType.DeletePointInTime
elif v == "list-all-point-in-time":
return OperationType.ListAllPointInTime
else:
raise KeyError(f"No enum value for [{v}]")

Expand Down
75 changes: 63 additions & 12 deletions tests/worker_coordinator/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4393,7 +4393,7 @@ async def test_delete_async_search(self, opensearch):
])


class OpenPointInTimeTests(TestCase):
class CreatePointInTimeTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_creates_point_in_time(self, opensearch):
Expand All @@ -4403,9 +4403,9 @@ async def test_creates_point_in_time(self, opensearch):
"index": "test-index"
}

opensearch.open_point_in_time.return_value = as_future({"id": pit_id})
opensearch.create_point_in_time.return_value = as_future({"pit_id": pit_id})

r = runner.OpenPointInTime()
r = runner.CreatePointInTime()
async with runner.CompositeContext():
await r(opensearch, params)
self.assertEqual(pit_id, runner.CompositeContext.get("open-pit-test"))
Expand All @@ -4419,30 +4419,80 @@ async def test_can_only_be_run_in_composite(self, opensearch):
"index": "test-index"
}

opensearch.open_point_in_time.return_value = as_future({"id": pit_id})
opensearch.create_point_in_time.return_value = as_future({"pit_id": pit_id})

r = runner.OpenPointInTime()
r = runner.CreatePointInTime()
with self.assertRaises(exceptions.BenchmarkAssertionError) as ctx:
await r(opensearch, params)

self.assertEqual("This operation is only allowed inside a composite operation.", ctx.exception.args[0])

class ClosePointInTimeTests(TestCase):
class DeletePointInTimeTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_closes_point_in_time(self, opensearch):
async def test_delete_point_in_time(self, opensearch):
pit_id = "0123456789abcdef"
params = {
"name": "close-pit-test",
"with-point-in-time-from": "open-pit-task1",
}
opensearch.close_point_in_time.return_value=(as_future())
r = runner.ClosePointInTime()
opensearch.delete_point_in_time.return_value=(as_future())
r = runner.DeletePointInTime()
async with runner.CompositeContext():
runner.CompositeContext.put("open-pit-task1", pit_id)
await r(opensearch, params)

opensearch.close_point_in_time.assert_called_once_with(body={"id": "0123456789abcdef"}, params={}, headers=None)
opensearch.delete_point_in_time.assert_called_once_with(body={"pit_id": ["0123456789abcdef"]}, params={}, headers=None)

@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_delete_point_in_time_without_context(self, opensearch):
params = {
"name": "close-pit-test",
}
opensearch.delete_point_in_time.return_value=(as_future())
r = runner.DeletePointInTime()
await r(opensearch, params)
opensearch.delete_point_in_time.assert_called_once_with(body=None, all=True, params={}, headers=None)

class ListAllPointInTimeTests(TestCase):
@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_get_all_point_in_time(self, opensearch):
pit_id = "0123456789abcdef"
params = {}
opensearch.list_all_point_in_time.return_value = as_future({
"pits": [
{
"pit_id": pit_id,
"keepAlive": 60000
}
]
})

r = runner.ListAllPointInTime()
await r(opensearch, params)
opensearch.list_all_point_in_time.assert_called_once()

@mock.patch("opensearchpy.OpenSearch")
@run_async
async def test_get_all_point_in_time_in_composite(self, opensearch):
pit_id = "0123456789abcdef"
params = {}
opensearch.list_all_point_in_time.return_value = as_future({
"pits": [
{
"pit_id": pit_id,
"keepAlive": 60000
}
]
})

r = runner.ListAllPointInTime()
async with runner.CompositeContext():
await r(opensearch, params)

opensearch.list_all_point_in_time.assert_called_once()


class QueryWithSearchAfterScrollTests(TestCase):
Expand Down Expand Up @@ -5145,8 +5195,9 @@ async def test_rejects_unsupported_operations(self, opensearch):
with self.assertRaises(exceptions.BenchmarkAssertionError) as ctx:
await r(opensearch, params)

self.assertEqual("Unsupported operation-type [bulk]. Use one of [open-point-in-time, close-point-in-time, "
"search, raw-request, sleep, submit-async-search, get-async-search, delete-async-search].",
self.assertEqual("Unsupported operation-type [bulk]. Use one of [create-point-in-time, delete-point-in-time,"
" list-all-point-in-time, search, paginated-search, raw-request, sleep, submit-async-search,"
" get-async-search, delete-async-search].",
ctx.exception.args[0])


Expand Down