Skip to content

Commit

Permalink
Add a telemetry deivce - Segment Replication Stats (#346)
Browse files Browse the repository at this point in the history
Signed-off-by: Tianli Feng <ftianli@amazon.com>
Signed-off-by: Tianli Feng <ftl94@live.com>
  • Loading branch information
Tianli Feng authored Aug 2, 2023
1 parent f11bc69 commit e04fa7f
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 2 deletions.
154 changes: 153 additions & 1 deletion osbenchmark/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def list_telemetry():
devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder,
Heapdump, NodeStats, RecoveryStats,
CcrStats, SegmentStats, TransformStats,
SearchableSnapshotsStats]]
SearchableSnapshotsStats,
SegmentReplicationStats]]
console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"]))
console.println("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.")

Expand Down Expand Up @@ -1670,3 +1671,154 @@ def detach_from_node(self, node, running):
def store_system_metrics(self, node, metrics_store):
if self.index_size_bytes:
metrics_store.put_value_node_level(node.node_name, "final_index_size_bytes", self.index_size_bytes, "byte")

class SegmentReplicationStats(TelemetryDevice):
internal = False
command = "segment-replication-stats"
human_name = "Segment Replication Stats"
help = "Regularly samples segment replication stats"

"""
Gathers Segment Replication stats
"""

def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``searchable-stats-indices``: str with index/index-pattern or list of indices or index-patterns
that stats should be collected from.
Not all clusters need to be specified, but any name used must be present in target.hosts.
Alternatively, the index or index pattern can be specified as a string in case only one cluster is involved.
Examples:
--telemetry-params="segment-replication-stats-indices:opensearchlogs-*"
--telemetry-params=./telemetry-params.json
where telemetry-params.json is:
{
"segment-replication-stats-indices": {
"default": ["leader-opensearchlogs-*"],
"follower": ["follower-opensearchlogs-*"]
}
}
``segment-replication-stats-sample-interval``: positive integer controlling the sampling interval.
Default: 1 second.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()

self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("segment-replication-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'segment-replication-stats-sample-interval' must be greater than zero "
f"but was [{self.sample_interval}].")
self.specified_cluster_names = self.clients.keys()
indices_per_cluster = self.telemetry_params.get("segment-replication-stats-indices", None)
# allow the user to specify either an index pattern as string or as a JSON object
if isinstance(indices_per_cluster, str):
self.indices_per_cluster = {opts.TargetHosts.DEFAULT: [indices_per_cluster]}
else:
self.indices_per_cluster = indices_per_cluster

if self.indices_per_cluster:
for cluster_name in self.indices_per_cluster.keys():
if cluster_name not in clients:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'segment-replication-stats-indices' must be a JSON Object with keys "
f"matching the cluster names [{','.join(sorted(clients.keys()))}] specified in --target-hosts "
f"but it had [{cluster_name}].")
self.specified_cluster_names = self.indices_per_cluster.keys()

self.metrics_store = metrics_store
self.samplers = []

def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = SegmentReplicationStatsRecorder(
cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval,
self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.setDaemon(True)
# we don't require starting recorders precisely at the same time
sampler.start()

def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()

class SegmentReplicationStatsRecorder:
"""
Collects and pushes segment replication stats for the specified cluster to the metric store.
"""

def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The OpenSearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
:param indices: optional list of indices to filter results from.
"""

self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.indices = indices
self.logger = logging.getLogger(__name__)

def __str__(self):
return "segment replication stats"

def record(self):
"""
Collect segment replication stats for indexes (optionally) specified in telemetry parameters
and push to metrics store.
"""
if not self.indices:
# if 'index' parameter is not assigned, set it as empty string to allow getting API response for all indexes
self.indices = [""]
for index in self.indices:
try:
stats_api_endpoint = "/_cat/segment_replication/"
stats = self.client.transport.perform_request("GET", stats_api_endpoint + index, params={"time": "ms"})
except opensearchpy.TransportError:
raise exceptions.BenchmarkError(
f"A transport error occurred while collecting segment replication stats on cluster "
f"[{self.cluster_name}]") from None

# parse the REST API response, each field will be an array element
stats_arr = []
for line_of_shard_stats in stats.splitlines():
stats_arr.append(line_of_shard_stats.split(" "))

for shard_stats_arr in stats_arr:
self._push_stats(stats=shard_stats_arr, index=index)

def _push_stats(self, stats, index=None):

doc = {
"name": "segment-replication-stats",
"shard_id": stats[0],
"target_node": stats[1],
"target_host": stats[2],
"checkpoints_behind": stats[3],
"bytes_behind": stats[4],
"current_lag_in_millis": stats[5],
"last_completed_lag_in_millis": stats[6],
"rejected_requests": stats[7]
}

meta_data = {
"cluster": self.cluster_name,
"index": index
}

self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=meta_data)
3 changes: 2 additions & 1 deletion osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ def prepare_telemetry(self, opensearch, enable):
telemetry.CcrStats(telemetry_params, opensearch, self.metrics_store),
telemetry.RecoveryStats(telemetry_params, opensearch, self.metrics_store),
telemetry.TransformStats(telemetry_params, opensearch, self.metrics_store),
telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store)
telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store),
telemetry.SegmentReplicationStats(telemetry_params, opensearch, self.metrics_store)
]
else:
devices = []
Expand Down
91 changes: 91 additions & 0 deletions tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3372,3 +3372,94 @@ def test_stores_nothing_if_no_data_path(self, run_subprocess, metrics_store_clus
self.assertEqual(0, run_subprocess.call_count)
self.assertEqual(0, metrics_store_cluster_value.call_count)
self.assertEqual(0, get_size.call_count)

class SegmentReplicationStatsTests(TestCase):
def test_negative_sample_interval_forbidden(self):
clients = {"default": Client(), "cluster_b": Client()}
cfg = create_config()
metrics_store = metrics.OsMetricsStore(cfg)
telemetry_params = {
"segment-replication-stats-sample-interval": -1 * random.random()
}
with self.assertRaisesRegex(exceptions.SystemSetupError,
r"The telemetry parameter 'segment-replication-stats-sample-interval' must be "
r"greater than zero but was .*\."):
telemetry.SegmentReplicationStats(telemetry_params, clients, metrics_store)

def test_wrong_cluster_name_in_segment_replication_stats_indices_forbidden(self):
clients = {"default": Client(), "cluster_b": Client()}
cfg = create_config()
metrics_store = metrics.OsMetricsStore(cfg)
telemetry_params = {
"segment-replication-stats-indices":{
"default": ["index-1"],
"wrong_cluster_name": ["index-2"]
}
}
with self.assertRaisesRegex(exceptions.SystemSetupError,
r"The telemetry parameter 'segment-replication-stats-indices' must be a JSON Object"
r" with keys matching the cluster names \[{}] specified in --target-hosts "
r"but it had \[wrong_cluster_name\].".format(",".join(sorted(clients.keys())))
):
telemetry.SegmentReplicationStats(telemetry_params, clients, metrics_store)

def test_cluster_name_can_be_ingored_in_segment_replication_stats_indices_when_only_one_cluster_is_involved(self):
clients = {"default": Client()}
cfg = create_config()
metrics_store = metrics.OsMetricsStore(cfg)
telemetry_params = {
"segment-replication-stats-indices": "index"
}
telemetry.SegmentReplicationStats(telemetry_params, clients, metrics_store)

class SegmentReplicationStatsRecorderTests(TestCase):
stats_response = """[so][0] node-1 127.0.0.1 1 2b 3 25 4
[so][1] node-2 127.0.0.1 5 6b 7 12 8"""

@mock.patch("osbenchmark.metrics.OsMetricsStore.put_doc")
def test_stores_default_stats(self, metrics_store_put_doc):
cfg = create_config()
metrics_store = metrics.OsMetricsStore(cfg)
client = Client(transport_client=TransportClient(responses=[SegmentReplicationStatsRecorderTests.stats_response]))

recorder = telemetry.SegmentReplicationStatsRecorder(
cluster_name="default",
client=client,
metrics_store=metrics_store,
sample_interval=1)
recorder.record()

metrics_store_put_doc.assert_has_calls([call({
"name": "segment-replication-stats",
"shard_id": "[so][0]",
"target_node": "node-1",
"target_host": "127.0.0.1",
"checkpoints_behind": "1",
"bytes_behind": "2b",
"current_lag_in_millis": "3",
"last_completed_lag_in_millis": "25",
"rejected_requests": "4"},
level=MetaInfoScope.cluster,
meta_data={
"cluster": "default", "index": ""}),
call({
"name": "segment-replication-stats",
"shard_id": "[so][1]",
"target_node": "node-2",
"target_host": "127.0.0.1",
"checkpoints_behind": "5",
"bytes_behind": "6b",
"current_lag_in_millis": "7",
"last_completed_lag_in_millis": "12",
"rejected_requests": "8"},
level=MetaInfoScope.cluster,
meta_data={
"cluster": "default", "index": ""})
], any_order=True)

def test_exception_on_transport_error(self):
client = Client(transport_client=TransportClient(responses=[], force_error=True))
metrics_store = metrics.OsMetricsStore(create_config())
with self.assertRaisesRegex(exceptions.BenchmarkError,
r"A transport error occurred while collecting segment replication stats on cluster \[default\]"):
telemetry.SegmentReplicationStatsRecorder("default", client, metrics_store, 1).record()

0 comments on commit e04fa7f

Please sign in to comment.