Skip to content

Commit

Permalink
p0 feature to support labeling in top queries
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Jun 6, 2024
1 parent 71a299f commit 3d982e8
Show file tree
Hide file tree
Showing 11 changed files with 13 additions and 273 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- Support rule-based labeling for search queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Collection<Object> createComponents(
) {
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
return List.of(queryInsightsService, new QueryInsightsListener(threadPool, clusterService, queryInsightsService));
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.search.labels.RequestLabelingService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -48,21 +46,15 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
private static final Logger log = LogManager.getLogger(QueryInsightsListener.class);

private final QueryInsightsService queryInsightsService;
private final ThreadPool threadPool;

/**
* Constructor for QueryInsightsListener
*
* @param threadPool the OpenSearch internal threadPool
* @param clusterService The Node's cluster service.
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
public QueryInsightsListener(
final ThreadPool threadPool,
final ClusterService clusterService,
final QueryInsightsService queryInsightsService
) {
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v));
Expand All @@ -83,7 +75,6 @@ public QueryInsightsListener(
.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.getTopQueriesService(MetricType.LATENCY)
.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
this.threadPool = threadPool;
}

/**
Expand Down Expand Up @@ -149,18 +140,12 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());

// Get internal computed and user provided labels
Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
String userProvidedLabel = RequestLabelingService.getUserProvidedTag(threadPool);
String userProvidedLabel = context.getTask().getHeader(Task.X_OPAQUE_ID);
if (userProvidedLabel != null) {
labels.put(Task.X_OPAQUE_ID, userProvidedLabel);
}
// Retrieve computed labels if exists
Map<String, Object> computedLabels = RequestLabelingService.getRuleBasedLabels(threadPool);
if (computedLabels != null) {
labels.putAll(computedLabels);
}
attributes.put(Attribute.LABELS, labels);
// construct SearchQueryRecord from attributes and measurements
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
Expand All @@ -26,7 +27,6 @@
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.labels.RequestLabelingService;
import org.opensearch.tasks.Task;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -76,7 +76,6 @@ public void setup() {

ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
threadContext.setHeaders(new Tuple<>(Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"), new HashMap<>()));
threadContext.putTransient(RequestLabelingService.RULE_BASED_LABELS, Map.of("labelKey", "labelValue"));
when(threadPool.getThreadContext()).thenReturn(threadContext);
}

Expand All @@ -88,6 +87,7 @@ public void testOnRequestEnd() throws InterruptedException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -98,7 +98,7 @@ public void testOnRequestEnd() throws InterruptedException {

int numberOfShards = 10;

QueryInsightsListener queryInsightsListener = new QueryInsightsListener(threadPool, clusterService, queryInsightsService);
QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService);

when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp);
when(searchRequest.searchType()).thenReturn(searchType);
Expand All @@ -107,6 +107,7 @@ public void testOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);
ArgumentCaptor<SearchQueryRecord> captor = ArgumentCaptor.forClass(SearchQueryRecord.class);

queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext);
Expand All @@ -118,7 +119,6 @@ public void testOnRequestEnd() throws InterruptedException {
assertEquals(searchType.toString().toLowerCase(Locale.ROOT), generatedRecord.getAttributes().get(Attribute.SEARCH_TYPE));
assertEquals(searchSourceBuilder.toString(), generatedRecord.getAttributes().get(Attribute.SOURCE));
Map<String, String> labels = (Map<String, String>) generatedRecord.getAttributes().get(Attribute.LABELS);
assertEquals("labelValue", labels.get("labelKey"));
assertEquals("userLabel", labels.get(Task.X_OPAQUE_ID));
}

Expand All @@ -129,6 +129,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(new TermsAggregationBuilder("agg1").userValueTypeHint(ValueType.STRING).field("type.keyword"));
searchSourceBuilder.size(0);
SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.singletonMap(Task.X_OPAQUE_ID, "userLabel"));

String[] indices = new String[] { "index-1", "index-2" };

Expand All @@ -148,14 +149,15 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {
when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap);
when(searchPhaseContext.getRequest()).thenReturn(searchRequest);
when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards);
when(searchPhaseContext.getTask()).thenReturn(task);

int numRequests = 50;
Thread[] threads = new Thread[numRequests];
Phaser phaser = new Phaser(numRequests + 1);
CountDownLatch countDownLatch = new CountDownLatch(numRequests);

for (int i = 0; i < numRequests; i++) {
searchListenersList.add(new QueryInsightsListener(threadPool, clusterService, queryInsightsService));
searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService));
}

for (int i = 0; i < numRequests; i++) {
Expand All @@ -176,7 +178,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException {

public void testSetEnabled() {
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
QueryInsightsListener queryInsightsListener = new QueryInsightsListener(threadPool, clusterService, queryInsightsService);
QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService);
queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true);
assertTrue(queryInsightsListener.isEnabled());

Expand Down
11 changes: 1 addition & 10 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.labels.RequestLabelingService;
import org.opensearch.search.labels.SearchRequestLabelingListener;
import org.opensearch.search.labels.rules.Rule;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.snapshots.InternalSnapshotsInfoService;
Expand Down Expand Up @@ -965,17 +962,11 @@ protected Node(
// Add the telemetryAwarePlugin components to the existing pluginComponents collection.
pluginComponents.addAll(telemetryAwarePluginComponents);

final SearchRequestLabelingListener searchRequestLabelingListener = new SearchRequestLabelingListener(
new RequestLabelingService(
threadPool,
pluginComponents.stream().filter(p -> p instanceof Rule).map(p -> (Rule) p).collect(toList())
)
);
// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
new SearchRequestOperationsCompositeListenerFactory(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog, searchRequestLabelingListener),
Stream.of(searchRequestStats, searchRequestSlowLog),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

27 changes: 0 additions & 27 deletions server/src/main/java/org/opensearch/search/labels/rules/Rule.java

This file was deleted.

This file was deleted.

Loading

0 comments on commit 3d982e8

Please sign in to comment.