diff --git a/osbenchmark/telemetry.py b/osbenchmark/telemetry.py index d9b3f02fd..750ab4d3a 100644 --- a/osbenchmark/telemetry.py +++ b/osbenchmark/telemetry.py @@ -41,7 +41,8 @@ def list_telemetry(): devices = [[device.command, device.human_name, device.help] for device in [JitCompiler, Gc, FlightRecorder, Heapdump, NodeStats, RecoveryStats, CcrStats, SegmentStats, TransformStats, - SearchableSnapshotsStats]] + SearchableSnapshotsStats, + SegmentReplicationStats]] console.println(tabulate.tabulate(devices, ["Command", "Name", "Description"])) console.println("\nKeep in mind that each telemetry device may incur a runtime overhead which can skew results.") @@ -1669,3 +1670,149 @@ def detach_from_node(self, node, running): def store_system_metrics(self, node, metrics_store): if self.index_size_bytes: metrics_store.put_value_node_level(node.node_name, "final_index_size_bytes", self.index_size_bytes, "byte") + +class SegmentReplicationStats(TelemetryDevice): + internal = False + command = "segment-replication-stats" + human_name = "Segment Replication Stats" + help = "Regularly samples segment replication stats" + + """ + Gathers Segment Replication stats + """ + + def __init__(self, telemetry_params, clients, metrics_store): + """ + :param telemetry_params: The configuration object for telemetry_params. + May optionally specify: + ``searchable-stats-indices``: str with index/index-pattern or list of indices or index-patterns + that stats should be collected from. + Not all clusters need to be specified, but any name used must be present in target.hosts. + Alternatively, the index or index pattern can be specified as a string in case only one cluster is involved. + + Examples: + --telemetry-params="segment-replication-stats-indices:opensearchlogs-*" + + --telemetry-params=./telemetry-params.json + where telemetry-params.json is: + { + "segment-replication-stats-indices": { + "default": ["leader-opensearchlogs-*"], + "follower": ["follower-opensearchlogs-*"] + } + } + + ``segment-replication-stats-sample-interval``: positive integer controlling the sampling interval. + Default: 1 second. + :param clients: A dict of clients to all clusters. + :param metrics_store: The configured metrics store we write to. + """ + super().__init__() + + self.telemetry_params = telemetry_params + self.clients = clients + self.sample_interval = telemetry_params.get("segment-replication-sample-interval", 1) + if self.sample_interval <= 0: + raise exceptions.SystemSetupError( + f"The telemetry parameter 'segment-replication-stats-sample-interval' must be greater than zero " + f"but was [{self.sample_interval}].") + self.specified_cluster_names = self.clients.keys() + indices_per_cluster = self.telemetry_params.get("segment-replication-stats-indices", None) + # allow the user to specify either an index pattern as string or as a JSON object + if isinstance(indices_per_cluster, str): + self.indices_per_cluster = {opts.TargetHosts.DEFAULT: [indices_per_cluster]} + else: + self.indices_per_cluster = indices_per_cluster + + if self.indices_per_cluster: + for cluster_name in self.indices_per_cluster.keys(): + if cluster_name not in clients: + raise exceptions.SystemSetupError( + f"The telemetry parameter 'segment-replication-stats-indices' must be a JSON Object with keys " + f"matching the cluster names [{','.join(sorted(clients.keys()))}] specified in --target-hosts " + f"but it had [{cluster_name}].") + self.specified_cluster_names = self.indices_per_cluster.keys() + + self.metrics_store = metrics_store + self.samplers = [] + + def on_benchmark_start(self): + for cluster_name in self.specified_cluster_names: + recorder = SegmentReplicationStatsRecorder( + cluster_name, self.clients[cluster_name], self.metrics_store, self.sample_interval, + self.indices_per_cluster[cluster_name] if self.indices_per_cluster else None) + sampler = SamplerThread(recorder) + self.samplers.append(sampler) + sampler.setDaemon(True) + # we don't require starting recorders precisely at the same time + sampler.start() + + def on_benchmark_stop(self): + if self.samplers: + for sampler in self.samplers: + sampler.finish() + +class SegmentReplicationStatsRecorder: + """ + Collects and pushes segment replication stats for the specified cluster to the metric store. + """ + + def __init__(self, cluster_name, client, metrics_store, sample_interval, indices=None): + """ + :param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. + :param client: The OpenSearch client for this cluster. + :param metrics_store: The configured metrics store we write to. + :param sample_interval: integer controlling the interval, in seconds, between collecting samples. + :param indices: optional list of indices to filter results from. + """ + + self.cluster_name = cluster_name + self.client = client + self.metrics_store = metrics_store + self.sample_interval = sample_interval + self.indices = indices + self.logger = logging.getLogger(__name__) + + def __str__(self): + return "segment replication stats" + + def record(self): + """ + Collect segment replication stats for indexes (optionally) specified in telemetry parameters + and push to metrics store. + """ + if not self.indices: + # if 'index' parameter is not assigned, set it as empty string to allow getting API response for all indexes + self.indices = [""] + for index in self.indices: + try: + stats_api_endpoint = "/_cat/segment_replication/" + stats = self.client.transport.perform_request("GET", stats_api_endpoint + index, params={"time": "ms"}) + except opensearchpy.TransportError: + raise exceptions.BenchmarkError( + f"A transport error occurred while collecting searchable snapshots stats on cluster " + f"[{self.cluster_name}]") from None + + # parse the REST API response, each field will be an array element + stats_arr = [] + for line_of_shard_stats in stats.splitlines(): + stats_arr.append(line_of_shard_stats.split(" ")) + + for shard_stats_arr in stats_arr: + self._push_stats(stats=shard_stats_arr, index=index) + + def _push_stats(self, stats, index=None): + + doc = { + "name": "segment-replication-stats", + "shard_id": stats[0], + "current_lag_in_millis": stats[5], + "last_completed_lag_in_millis": stats[6], + } + + meta_data = { + "cluster": self.cluster_name, + "index": index + } + + self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=meta_data) diff --git a/osbenchmark/worker_coordinator/worker_coordinator.py b/osbenchmark/worker_coordinator/worker_coordinator.py index 7722f0bd0..a445f7514 100644 --- a/osbenchmark/worker_coordinator/worker_coordinator.py +++ b/osbenchmark/worker_coordinator/worker_coordinator.py @@ -580,7 +580,8 @@ def prepare_telemetry(self, opensearch, enable): telemetry.CcrStats(telemetry_params, opensearch, self.metrics_store), telemetry.RecoveryStats(telemetry_params, opensearch, self.metrics_store), telemetry.TransformStats(telemetry_params, opensearch, self.metrics_store), - telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store) + telemetry.SearchableSnapshotsStats(telemetry_params, opensearch, self.metrics_store), + telemetry.SegmentReplicationStats(telemetry_params, opensearch, self.metrics_store) ] else: devices = []