Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the telemetry device "Segment Replication Stats" by changing to parse JSON format and using numeric bytes value #356

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions osbenchmark/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,7 @@ def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``searchable-stats-indices``: str with index/index-pattern or list of indices or index-patterns
``segment-replication-stats-indices``: str with index/index-pattern or list of indices or index-patterns
that stats should be collected from.
Not all clusters need to be specified, but any name used must be present in target.hosts.
Alternatively, the index or index pattern can be specified as a string in case only one cluster is involved.
Expand Down Expand Up @@ -1788,32 +1788,29 @@ def record(self):
for index in self.indices:
try:
stats_api_endpoint = "/_cat/segment_replication/"
stats = self.client.transport.perform_request("GET", stats_api_endpoint + index, params={"time": "ms"})
stats = self.client.transport.perform_request(
"GET", stats_api_endpoint + index, params={"time": "ms", "bytes": "b", "format": "JSON"})
except opensearchpy.TransportError:
raise exceptions.BenchmarkError(
f"A transport error occurred while collecting segment replication stats on cluster "
f"[{self.cluster_name}]") from None

# parse the REST API response, each field will be an array element
stats_arr = []
for line_of_shard_stats in stats.splitlines():
stats_arr.append(line_of_shard_stats.split(" "))

for shard_stats_arr in stats_arr:
self._push_stats(stats=shard_stats_arr, index=index)
# parse the REST API response, each element in the list is a shard
for shard_stats in stats:
self._push_stats(stats=shard_stats, index=index)

def _push_stats(self, stats, index=None):

doc = {
"name": "segment-replication-stats",
"shard_id": stats[0],
"target_node": stats[1],
"target_host": stats[2],
"checkpoints_behind": stats[3],
"bytes_behind": stats[4],
"current_lag_in_millis": stats[5],
"last_completed_lag_in_millis": stats[6],
"rejected_requests": stats[7]
"shard_id": stats["shardId"],
"target_node": stats["target_node"],
"target_host": stats["target_host"],
"checkpoints_behind": int(stats["checkpoints_behind"]),
"bytes_behind": int(stats["bytes_behind"]),
"current_lag_in_millis": int(stats["current_lag"]),
"last_completed_lag_in_millis": int(stats["last_completed_lag"]),
"rejected_requests": int(stats["rejected_requests"])
}

meta_data = {
Expand Down
54 changes: 36 additions & 18 deletions tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3413,8 +3413,26 @@ def test_cluster_name_can_be_ingored_in_segment_replication_stats_indices_when_o
telemetry.SegmentReplicationStats(telemetry_params, clients, metrics_store)

class SegmentReplicationStatsRecorderTests(TestCase):
stats_response = """[so][0] node-1 127.0.0.1 1 2b 3 25 4
[so][1] node-2 127.0.0.1 5 6b 7 12 8"""
stats_response = [{
"shardId" : "[logs-211998][6]",
"target_node" : "ip-10-0-4-187.internal",
"target_host" : "10.0.4.187",
"checkpoints_behind" : "1",
"bytes_behind" : "47801",
"current_lag" : "691",
"last_completed_lag" : "821",
"rejected_requests" : "0"
},
{
"shardId" : "[logs-211998][7]",
"target_node" : "ip-10-0-5-230.internal",
"target_host" : "10.0.5.230",
"checkpoints_behind" : "2",
"bytes_behind" : "46623",
"current_lag" : "2039",
"last_completed_lag" : "323",
"rejected_requests" : "1"
}]

@mock.patch("osbenchmark.metrics.OsMetricsStore.put_doc")
def test_stores_default_stats(self, metrics_store_put_doc):
Expand All @@ -3431,27 +3449,27 @@ def test_stores_default_stats(self, metrics_store_put_doc):

metrics_store_put_doc.assert_has_calls([call({
"name": "segment-replication-stats",
"shard_id": "[so][0]",
"target_node": "node-1",
"target_host": "127.0.0.1",
"checkpoints_behind": "1",
"bytes_behind": "2b",
"current_lag_in_millis": "3",
"last_completed_lag_in_millis": "25",
"rejected_requests": "4"},
"shard_id": "[logs-211998][6]",
Copy link
Member

@dreamer-89 dreamer-89 Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is shard_id, target_node change needed in tests ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dreamer-89 Thanks a lot for your review! 👏 The change is indeed not necessary, I just updated the test with a real API response from my recent performance test. I think it's fine to keep the changes.

"target_node": "ip-10-0-4-187.internal",
"target_host": "10.0.4.187",
"checkpoints_behind": 1,
"bytes_behind": 47801,
"current_lag_in_millis": 691,
"last_completed_lag_in_millis": 821,
"rejected_requests": 0},
level=MetaInfoScope.cluster,
meta_data={
"cluster": "default", "index": ""}),
call({
"name": "segment-replication-stats",
"shard_id": "[so][1]",
"target_node": "node-2",
"target_host": "127.0.0.1",
"checkpoints_behind": "5",
"bytes_behind": "6b",
"current_lag_in_millis": "7",
"last_completed_lag_in_millis": "12",
"rejected_requests": "8"},
"shard_id": "[logs-211998][7]",
"target_node": "ip-10-0-5-230.internal",
"target_host": "10.0.5.230",
"checkpoints_behind": 2,
"bytes_behind": 46623,
"current_lag_in_millis": 2039,
"last_completed_lag_in_millis": 323,
"rejected_requests": 1},
level=MetaInfoScope.cluster,
meta_data={
"cluster": "default", "index": ""})
Expand Down