Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Oct 6, 2023

Verified

This commit was signed with the committer’s verified signature.
1 parent e7b0c49 commit cd5299b
Showing 14 changed files with 153 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.GlobalPerformanceStats;
import org.opensearch.node.NodesPerformanceStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
@@ -144,7 +144,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
private SearchPipelineStats searchPipelineStats;

@Nullable
private GlobalPerformanceStats globalPerformanceStats;
private NodesPerformanceStats nodesPerformanceStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
@@ -202,10 +202,10 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchPipelineStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.11 when we backport
globalPerformanceStats = in.readOptionalWriteable(GlobalPerformanceStats::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
nodesPerformanceStats = in.readOptionalWriteable(NodesPerformanceStats::new);
} else {
globalPerformanceStats = null;
nodesPerformanceStats = null;
}
}

@@ -225,7 +225,7 @@ public NodeStats(
@Nullable DiscoveryStats discoveryStats,
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable GlobalPerformanceStats globalPerformanceStats,
@Nullable NodesPerformanceStats nodesPerformanceStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
@@ -251,7 +251,7 @@ public NodeStats(
this.discoveryStats = discoveryStats;
this.ingestStats = ingestStats;
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.globalPerformanceStats = globalPerformanceStats;
this.nodesPerformanceStats = nodesPerformanceStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
@@ -356,8 +356,8 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() {
}

@Nullable
public GlobalPerformanceStats getNodesPerformanceStats() {
return globalPerformanceStats;
public NodesPerformanceStats getNodesPerformanceStats() {
return nodesPerformanceStats;
}

@Nullable
@@ -446,8 +446,8 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO : make it 2.11 when we backport
out.writeOptionalWriteable(globalPerformanceStats);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
out.writeOptionalWriteable(nodesPerformanceStats);
}
}

Original file line number Diff line number Diff line change
@@ -214,7 +214,7 @@ public enum Metric {
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline"),
GLOBAL_PERFORMANCE_STATS("performance_stats");
PERFORMANCE_STATS("performance_stats");

private String metricName;

Original file line number Diff line number Diff line change
@@ -125,7 +125,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.GLOBAL_PERFORMANCE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.PERFORMANCE_STATS.containedIn(metrics)
);
}

14 changes: 7 additions & 7 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
@@ -1075,7 +1075,7 @@ protected Node(
settings,
clusterService.getClusterSettings()
);
final PerfStatsCollectorService perfStatsCollectorService = new PerfStatsCollectorService(
final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(
nodePerformanceTracker,
clusterService,
threadPool
@@ -1102,7 +1102,7 @@ protected Node(
searchPipelineService,
fileCache,
taskCancellationMonitoringService,
perfStatsCollectorService
performanceCollectorService
);

final SearchService searchService = newSearchService(
@@ -1223,8 +1223,8 @@ protected Node(
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(PerfStatsCollectorService.class).toInstance(perfStatsCollectorService);
b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker);
b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
@@ -1342,7 +1342,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(NodePerformanceTracker.class).start();
injector.getInstance(PerfStatsCollectorService.class).start();
injector.getInstance(PerformanceCollectorService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
@@ -1506,7 +1506,7 @@ private Node stop() {
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(NodePerformanceTracker.class).stop();
injector.getInstance(PerfStatsCollectorService.class).stop();
injector.getInstance(PerformanceCollectorService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
@@ -1572,8 +1572,8 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("node_performance_tracker"));
toClose.add(injector.getInstance(NodePerformanceTracker.class));
toClose.add(() -> stopWatch.stop().start("perf_stats_collector"));
toClose.add(injector.getInstance(PerfStatsCollectorService.class));
toClose.add(() -> stopWatch.stop().start("performance_collector"));
toClose.add(injector.getInstance(PerformanceCollectorService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
Original file line number Diff line number Diff line change
@@ -19,20 +19,20 @@
* This represents the performance stats of a node along with the timestamp at which the stats object was created
* in the respective node
*/
public class NodePerformanceStatistics implements Writeable {
public class NodePerformanceStats implements Writeable {
final String nodeId;
long timestamp;
double cpuUtilizationPercent;
double memoryUtilizationPercent;

public NodePerformanceStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
public NodePerformanceStats(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
this.nodeId = nodeId;
this.cpuUtilizationPercent = cpuUtilizationPercent;
this.memoryUtilizationPercent = memoryUtilizationPercent;
this.timestamp = timestamp;
}

public NodePerformanceStatistics(StreamInput in) throws IOException {
public NodePerformanceStats(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.timestamp = in.readLong();
this.cpuUtilizationPercent = in.readDouble();
@@ -58,12 +58,10 @@ public String toString() {
return sb.toString();
}

NodePerformanceStatistics(NodePerformanceStatistics nodePerformanceStatistics) {
NodePerformanceStats(NodePerformanceStats nodePerformanceStats) {
this(
nodePerformanceStatistics.nodeId,
nodePerformanceStatistics.cpuUtilizationPercent,
nodePerformanceStatistics.memoryUtilizationPercent,
nodePerformanceStatistics.timestamp
nodePerformanceStats.nodeId,
nodePerformanceStats.timestamp, nodePerformanceStats.memoryUtilizationPercent, nodePerformanceStats.cpuUtilizationPercent
);
}

8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ public class NodeService implements Closeable {
private final ScriptService scriptService;
private final HttpServerTransport httpServerTransport;
private final ResponseCollectorService responseCollectorService;
private final PerfStatsCollectorService perfStatsCollectorService;
private final PerformanceCollectorService performanceCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
@@ -116,7 +116,7 @@ public class NodeService implements Closeable {
SearchPipelineService searchPipelineService,
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService,
PerfStatsCollectorService perfStatsCollectorService
PerformanceCollectorService performanceCollectorService
) {
this.settings = settings;
this.threadPool = threadPool;
@@ -139,7 +139,7 @@ public class NodeService implements Closeable {
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.perfStatsCollectorService = perfStatsCollectorService;
this.performanceCollectorService = performanceCollectorService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
@@ -241,7 +241,7 @@ public NodeStats stats(
discoveryStats ? discovery.stats() : null,
ingest ? ingestService.stats() : null,
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
nodesPerfStats ? perfStatsCollectorService.stats() : null,
nodesPerfStats ? performanceCollectorService.stats() : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
Original file line number Diff line number Diff line change
@@ -24,17 +24,17 @@
* This class represents performance stats such as CPU, Memory and IO resource usage of each node along with the time
* elapsed from when the stats were recorded.
*/
public class GlobalPerformanceStats implements Writeable, ToXContentFragment {
public class NodesPerformanceStats implements Writeable, ToXContentFragment {

// Map of node id to perf stats of the corresponding node.
private final Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap;
private final Map<String, NodePerformanceStats> nodeIdToPerfStatsMap;

public GlobalPerformanceStats(Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap) {
public NodesPerformanceStats(Map<String, NodePerformanceStats> nodeIdToPerfStatsMap) {
this.nodeIdToPerfStatsMap = nodeIdToPerfStatsMap;
}

public GlobalPerformanceStats(StreamInput in) throws IOException {
this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStatistics::new);
public NodesPerformanceStats(StreamInput in) throws IOException {
this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStats::new);
}

@Override
@@ -45,7 +45,7 @@ public void writeTo(StreamOutput out) throws IOException {
/**
* Returns map of node id to perf stats of the corresponding node.
*/
public Map<String, NodePerformanceStatistics> getNodeIdToNodePerfStatsMap() {
public Map<String, NodePerformanceStats> getNodeIdToNodePerfStatsMap() {
return nodeIdToPerfStatsMap;
}

@@ -54,7 +54,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject("performance_stats");
for (String nodeId : nodeIdToPerfStatsMap.keySet()) {
builder.startObject(nodeId);
NodePerformanceStatistics perfStats = nodeIdToPerfStatsMap.get(nodeId);
NodePerformanceStats perfStats = nodeIdToPerfStatsMap.get(nodeId);
if (perfStats != null) {
builder.field(
"elapsed_time",
Original file line number Diff line number Diff line change
@@ -21,7 +21,6 @@
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -31,24 +30,28 @@
* This collects node level performance statistics such as cpu, memory, IO of each node and makes it available for
* coordinator node to aid in throttling, ranking etc
*/
public class PerfStatsCollectorService extends AbstractLifecycleComponent implements ClusterStateListener {
public class PerformanceCollectorService extends AbstractLifecycleComponent implements ClusterStateListener {

/**
* This refresh interval denotes the polling interval of PerfStatsCollectorService to refresh the performance stats
* This refresh interval denotes the polling interval of PerformanceCollectorService to refresh the performance stats
* from local node
*/
private static long REFRESH_INTERVAL_IN_MILLIS = 1000;

private static final Logger logger = LogManager.getLogger(PerfStatsCollectorService.class);
private final ConcurrentMap<String, NodePerformanceStatistics> nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap();
private static final Logger logger = LogManager.getLogger(PerformanceCollectorService.class);
private final ConcurrentMap<String, NodePerformanceStats> nodeIdToPerfStats = ConcurrentCollections.newConcurrentMap();

private ThreadPool threadPool;
private volatile Scheduler.Cancellable scheduledFuture;

private NodePerformanceTracker nodePerformanceTracker;
private ClusterService clusterService;

public PerfStatsCollectorService(NodePerformanceTracker nodePerformanceTracker, ClusterService clusterService, ThreadPool threadPool) {
public PerformanceCollectorService(
NodePerformanceTracker nodePerformanceTracker,
ClusterService clusterService,
ThreadPool threadPool
) {
this.threadPool = threadPool;
this.nodePerformanceTracker = nodePerformanceTracker;
this.clusterService = clusterService;
@@ -71,10 +74,10 @@ void removeNodePerfStatistics(String nodeId) {
/**
* Collect node performance statistics along with the timestamp
*/
public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercent, double memoryUtilizationPercent, long timestamp) {
public void collectNodePerfStatistics(String nodeId, long timestamp, double memoryUtilizationPercent, double cpuUtilizationPercent) {
nodeIdToPerfStats.compute(nodeId, (id, nodePerfStats) -> {
if (nodePerfStats == null) {
return new NodePerformanceStatistics(nodeId, cpuUtilizationPercent, memoryUtilizationPercent, timestamp);
return new NodePerformanceStats(nodeId, timestamp, memoryUtilizationPercent, cpuUtilizationPercent);
} else {
nodePerfStats.cpuUtilizationPercent = cpuUtilizationPercent;
nodePerfStats.memoryUtilizationPercent = memoryUtilizationPercent;
@@ -87,9 +90,9 @@ public void collectNodePerfStatistics(String nodeId, double cpuUtilizationPercen
/**
* Get all node statistics which will be used for node stats
*/
public Map<String, NodePerformanceStatistics> getAllNodeStatistics() {
Map<String, NodePerformanceStatistics> nodeStats = new HashMap<>(nodeIdToPerfStats.size());
nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStatistics(nodePerfStats)); });
public Map<String, NodePerformanceStats> getAllNodeStatistics() {
Map<String, NodePerformanceStats> nodeStats = new HashMap<>(nodeIdToPerfStats.size());
nodeIdToPerfStats.forEach((nodeId, nodePerfStats) -> { nodeStats.put(nodeId, new NodePerformanceStats(nodePerfStats)); });
return nodeStats;
}

@@ -98,27 +101,27 @@ public Map<String, NodePerformanceStatistics> getAllNodeStatistics() {
* performance stats information exists for the given node. Returns an empty
* {@code Optional} if the node was not found.
*/
public Optional<NodePerformanceStatistics> getNodeStatistics(final String nodeId) {
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStatistics(perfStats));
public Optional<NodePerformanceStats> getNodeStatistics(final String nodeId) {
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStats(perfStats));
}

/**
* Returns collected performance statistics of all nodes
*/
public GlobalPerformanceStats stats() {
return new GlobalPerformanceStats(getAllNodeStatistics());
public NodesPerformanceStats stats() {
return new NodesPerformanceStats(getAllNodeStatistics());
}

/**
* Fetch local node performance statistics and add it to store along with the current timestamp
*/
private void getLocalNodePerformanceStats() {
private void collectLocalNodePerformanceStats() {
if (nodePerformanceTracker.isReady() && clusterService.state() != null) {
collectNodePerfStatistics(
clusterService.state().nodes().getLocalNodeId(),
nodePerformanceTracker.getCpuUtilizationPercent(),
System.currentTimeMillis(),
nodePerformanceTracker.getMemoryUtilizationPercent(),
System.currentTimeMillis()
nodePerformanceTracker.getCpuUtilizationPercent()
);
}
}
@@ -130,9 +133,9 @@ protected void doStart() {
*/
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
getLocalNodePerformanceStats();
collectLocalNodePerformanceStats();
} catch (Exception e) {
logger.warn("failure in PerfStatsCollectorService", e);
logger.warn("failure in PerformanceCollectorService", e);
}
}, new TimeValue(REFRESH_INTERVAL_IN_MILLIS), ThreadPool.Names.GENERIC);
}
@@ -145,5 +148,5 @@ protected void doStop() {
}

@Override
protected void doClose() throws IOException {}
protected void doClose() {}
}
Loading

0 comments on commit cd5299b

Please sign in to comment.