diff --git a/CHANGELOG.md b/CHANGELOG.md index 356d273a7850d..24d92cd810727 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- Create listener to refresh search thread resource usage ([#14832](https://github.com/opensearch-project/OpenSearch/pull/14832)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java new file mode 100644 index 0000000000000..4434d71793b23 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchTaskRequestOperationsListener.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.tasks.TaskResourceTrackingService; + +/** + * SearchTaskRequestOperationsListener subscriber for operations on search tasks resource usages. + * Listener ensures to refreshResourceStats on request end capturing the search task resource usage + * upon request completion. + * + */ +public final class SearchTaskRequestOperationsListener extends SearchRequestOperationsListener { + private final TaskResourceTrackingService taskResourceTrackingService; + + public SearchTaskRequestOperationsListener(TaskResourceTrackingService taskResourceTrackingService) { + this.taskResourceTrackingService = taskResourceTrackingService; + } + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + taskResourceTrackingService.refreshResourceStats(context.getTask()); + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 712f73d21219a..3d6b08a5ef3d8 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -50,6 +50,7 @@ import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.action.search.SearchTaskRequestOperationsListener; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; @@ -795,8 +796,17 @@ protected Node( threadPool ); + final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService( + settings, + clusterService.getClusterSettings(), + threadPool + ); + final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings()); final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + final SearchTaskRequestOperationsListener searchTaskRequestOperationsListener = new SearchTaskRequestOperationsListener( + taskResourceTrackingService + ); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); CacheModule cacheModule = new CacheModule(pluginsService.filterPlugins(CachePlugin.class), settings); @@ -898,7 +908,7 @@ protected Node( final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = new SearchRequestOperationsCompositeListenerFactory( Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog), + Stream.of(searchRequestStats, searchRequestSlowLog, searchTaskRequestOperationsListener), pluginComponents.stream() .filter(p -> p instanceof SearchRequestOperationsListener) .map(p -> (SearchRequestOperationsListener) p) @@ -1026,12 +1036,6 @@ protected Node( // development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478). clusterService.setIndexingPressureService(indexingPressureService); - final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService( - settings, - clusterService.getClusterSettings(), - threadPool - ); - final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( settings, clusterService.getClusterSettings()