Skip to content

Commit

Permalink
add telemetry device of segment replication stats
Browse files Browse the repository at this point in the history
  • Loading branch information
tlfeng committed Jul 12, 2023
1 parent 30c8043 commit aa3d2de
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 2 deletions.
149 changes: 148 additions & 1 deletion osbenchmark/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def list_telemetry():
devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder,
Heapdump, NodeStats, RecoveryStats,
CcrStats, SegmentStats, TransformStats,
SearchableSnapshotsStats]]
SearchableSnapshotsStats,
SegmentReplicationStats]]
console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"]))
console.println("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.")

Expand Down Expand Up @@ -1669,3 +1670,149 @@ def detach_from_node(self, node, running):
def store_system_metrics(self, node, metrics_store):
if self.index_size_bytes:
metrics_store.put_value_node_level(node.node_name, "final_index_size_bytes", self.index_size_bytes, "byte")

class SegmentReplicationStats(TelemetryDevice):
internal = False
command = "segment-replication-stats"
human_name = "Segment Replication Stats"
help = "Regularly samples segment replication stats"

"""
Gathers Segment Replication stats
"""

def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
May optionally specify:
``searchable-stats-indices``: str with index/index-pattern or list of indices or index-patterns
that stats should be collected from.
Not all clusters need to be specified, but any name used must be present in target.hosts.
Alternatively, the index or index pattern can be specified as a string in case only one cluster is involved.
Examples:
--telemetry-params="segment-replication-stats-indices:opensearchlogs-*"
--telemetry-params=./telemetry-params.json
where telemetry-params.json is:
{
"segment-replication-stats-indices": {
"default": ["leader-opensearchlogs-*"],
"follower": ["follower-opensearchlogs-*"]
}
}
``segment-replication-stats-sample-interval``: positive integer controlling the sampling interval.
Default: 1 second.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()

self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("segment-replication-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'segment-replication-stats-sample-interval' must be greater than zero "
f"but was [{self.sample_interval}].")
self.specified_cluster_names = self.clients.keys()
indices_per_cluster = self.telemetry_params.get("segment-replication-stats-indices", None)
# allow the user to specify either an index pattern as string or as a JSON object
if isinstance(indices_per_cluster, str):
self.indices_per_cluster = {opts.TargetHosts.DEFAULT: [indices_per_cluster]}
else:
self.indices_per_cluster = indices_per_cluster

if self.indices_per_cluster:
for cluster_name in self.indices_per_cluster.keys():
if cluster_name not in clients:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'segment-replication-stats-indices' must be a JSON Object with keys "
f"matching the cluster names [{','.join(sorted(clients.keys()))}] specified in --target-hosts "
f"but it had [{cluster_name}].")
self.specified_cluster_names = self.indices_per_cluster.keys()

self.metrics_store = metrics_store
self.samplers = []

def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
recorder = SegmentReplicationStatsRecorder(
cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval,
self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.setDaemon(True)
# we don't require starting recorders precisely at the same time
sampler.start()

def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()

class SegmentReplicationStatsRecorder:
"""
Collects and pushes segment replication stats for the specified cluster to the metric store.
"""

def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts.
:param client: The OpenSearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
:param indices: optional list of indices to filter results from.
"""

self.cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.indices = indices
self.logger = logging.getLogger(__name__)

def __str__(self):
return "segment replication stats"

def record(self):
"""
Collect segment replication stats for indexes (optionally) specified in telemetry parameters
and push to metrics store.
"""
if not self.indices:
# if 'index' parameter is not assigned, set it as empty string to allow getting API response for all indexes
self.indices = [""]
for index in self.indices:
try:
stats_api_endpoint = "/_cat/segment_replication/"
stats = self.client.transport.perform_request("GET", stats_api_endpoint + index, params={"time": "ms"})
except opensearchpy.TransportError:
raise exceptions.BenchmarkError(
f"A transport error occurred while collecting searchable snapshots stats on cluster "
f"[{self.cluster_name}]") from None

# parse the REST API response, each field will be an array element
stats_arr = []
for line_of_shard_stats in stats.splitlines():
stats_arr.append(line_of_shard_stats.split(" "))

for shard_stats_arr in stats_arr:
self._push_stats(stats=shard_stats_arr, index=index)

def _push_stats(self, stats, index=None):

doc = {
"name": "segment-replication-stats",
"shard_id": stats[0],
"current_lag_in_millis": stats[5],
"last_completed_lag_in_millis": stats[6],
}

meta_data = {
"cluster": self.cluster_name,
"index": index
}

self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=meta_data)
3 changes: 2 additions & 1 deletion osbenchmark/worker_coordinator/worker_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ def prepare_telemetry(self, opensearch, enable):
telemetry.CcrStats(telemetry_params, opensearch, self.metrics_store),
telemetry.RecoveryStats(telemetry_params, opensearch, self.metrics_store),
telemetry.TransformStats(telemetry_params, opensearch, self.metrics_store),
telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store)
telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store),
telemetry.SegmentReplicationStats(telemetry_params, opensearch, self.metrics_store)
]
else:
devices = []
Expand Down

0 comments on commit aa3d2de

Please sign in to comment.