diff --git a/CHANGELOG.md b/CHANGELOG.md index 50a1a6ea99243..5afdf6b707d1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -223,6 +223,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Extract cluster management for integration tests into JUnit test rule out of OpenSearchIntegTestCase ([#11877](https://github.com/opensearch-project/OpenSearch/pull/11877)), ([#12000](https://github.com/opensearch-project/OpenSearch/pull/12000)) - Workaround for https://bugs.openjdk.org/browse/JDK-8323659 regression, introduced in JDK-21.0.2 ([#11968](https://github.com/opensearch-project/OpenSearch/pull/11968)) - Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508)) +- [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903)) ### Deprecated diff --git a/plugins/query-insights/build.gradle b/plugins/query-insights/build.gradle new file mode 100644 index 0000000000000..eabbd395bd3bd --- /dev/null +++ b/plugins/query-insights/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +opensearchplugin { + description 'OpenSearch Query Insights Plugin.' + classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' +} + +dependencies { +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java new file mode 100644 index 0000000000000..0acd7e2dcac0d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.ActionRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ExecutorBuilder; +import org.opensearch.threadpool.ScalingExecutorBuilder; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for Query Insights. + */ +public class QueryInsightsPlugin extends Plugin implements ActionPlugin { + /** + * Default constructor + */ + public QueryInsightsPlugin() {} + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier repositoriesServiceSupplier + ) { + // create top n queries service + final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); + return List.of(queryInsightsService); + } + + @Override + public List> getExecutorBuilders(final Settings settings) { + return List.of( + new ScalingExecutorBuilder( + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, + 1, + Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT), + TimeValue.timeValueMinutes(5) + ) + ); + } + + @Override + public List getRestHandlers( + final Settings settings, + final RestController restController, + final ClusterSettings clusterSettings, + final IndexScopedSettings indexScopedSettings, + final SettingsFilter settingsFilter, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier nodesInCluster + ) { + return List.of(); + } + + @Override + public List> getActions() { + return List.of(); + } + + @Override + public List> getSettings() { + return List.of( + // Settings for top N queries + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + ); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java new file mode 100644 index 0000000000000..525ca0d4a3d33 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Service responsible for gathering, analyzing, storing and exporting + * information related to search queries + * + * @opensearch.internal + */ +public class QueryInsightsService extends AbstractLifecycleComponent { + /** + * The internal OpenSearch thread pool that execute async processing and exporting tasks + */ + private final ThreadPool threadPool; + + /** + * Services to capture top n queries for different metric types + */ + private final Map topQueriesServices; + + /** + * Flags for enabling insight data collection for different metric types + */ + private final Map enableCollect; + + /** + * The internal thread-safe queue to ingest the search query data and subsequently forward to processors + */ + private final LinkedBlockingQueue queryRecordsQueue; + + /** + * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when + * the service closed concurrently. + */ + protected volatile Scheduler.Cancellable scheduledFuture; + + /** + * Constructor of the QueryInsightsService + * + * @param threadPool The OpenSearch thread pool to run async tasks + */ + @Inject + public QueryInsightsService(final ThreadPool threadPool) { + enableCollect = new HashMap<>(); + queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); + topQueriesServices = new HashMap<>(); + for (MetricType metricType : MetricType.allMetricTypes()) { + enableCollect.put(metricType, false); + topQueriesServices.put(metricType, new TopQueriesService(metricType)); + } + this.threadPool = threadPool; + } + + /** + * Ingest the query data into in-memory stores + * + * @param record the record to ingest + */ + public boolean addRecord(final SearchQueryRecord record) { + boolean shouldAdd = false; + for (Map.Entry entry : topQueriesServices.entrySet()) { + if (!enableCollect.get(entry.getKey())) { + continue; + } + List currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot(); + // skip add to top N queries store if the incoming record is smaller than the Nth record + if (currentSnapshot.size() < entry.getValue().getTopNSize() + || SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) { + shouldAdd = true; + break; + } + } + if (shouldAdd) { + return queryRecordsQueue.offer(record); + } + return false; + } + + /** + * Drain the queryRecordsQueue into internal stores and services + */ + public void drainRecords() { + final List records = new ArrayList<>(); + queryRecordsQueue.drainTo(records); + records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); + for (MetricType metricType : MetricType.allMetricTypes()) { + if (enableCollect.get(metricType)) { + // ingest the records into topQueriesService + topQueriesServices.get(metricType).consumeRecords(records); + } + } + } + + /** + * Get the top queries service based on metricType + * @param metricType {@link MetricType} + * @return {@link TopQueriesService} + */ + public TopQueriesService getTopQueriesService(final MetricType metricType) { + return topQueriesServices.get(metricType); + } + + /** + * Set flag to enable or disable Query Insights data collection + * + * @param metricType {@link MetricType} + * @param enable Flag to enable or disable Query Insights data collection + */ + public void enableCollection(final MetricType metricType, final boolean enable) { + this.enableCollect.put(metricType, enable); + this.topQueriesServices.get(metricType).setEnabled(enable); + } + + /** + * Get if the Query Insights data collection is enabled for a MetricType + * + * @param metricType {@link MetricType} + * @return if the Query Insights data collection is enabled + */ + public boolean isCollectionEnabled(final MetricType metricType) { + return this.enableCollect.get(metricType); + } + + /** + * Check if query insights service is enabled + * + * @return if query insights service is enabled + */ + public boolean isEnabled() { + for (MetricType t : MetricType.allMetricTypes()) { + if (isCollectionEnabled(t)) { + return true; + } + } + return false; + } + + @Override + protected void doStart() { + if (isEnabled()) { + scheduledFuture = threadPool.scheduleWithFixedDelay( + this::drainRecords, + QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR + ); + } + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() {} +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java new file mode 100644 index 0000000000000..d2c30cbdf98e7 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -0,0 +1,282 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Service responsible for gathering and storing top N queries + * with high latency or resource usage + * + * @opensearch.internal + */ +public class TopQueriesService { + private boolean enabled; + /** + * The metric type to measure top n queries + */ + private final MetricType metricType; + private int topNSize; + /** + * The window size to keep the top n queries + */ + private TimeValue windowSize; + /** + * The current window start timestamp + */ + private long windowStart; + /** + * The internal thread-safe store that holds the top n queries insight data + */ + private final PriorityQueue topQueriesStore; + + /** + * The AtomicReference of a snapshot of the current window top queries for getters to consume + */ + private final AtomicReference> topQueriesCurrentSnapshot; + + /** + * The AtomicReference of a snapshot of the last window top queries for getters to consume + */ + private final AtomicReference> topQueriesHistorySnapshot; + + TopQueriesService(final MetricType metricType) { + this.enabled = false; + this.metricType = metricType; + this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; + this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; + this.windowStart = -1L; + topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); + topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); + } + + /** + * Set the top N size for TopQueriesService service. + * + * @param topNSize the top N size to set + */ + public void setTopNSize(final int topNSize) { + this.topNSize = topNSize; + } + + /** + * Get the current configured top n size + * + * @return top n size + */ + public int getTopNSize() { + return topNSize; + } + + /** + * Validate the top N size based on the internal constrains + * + * @param size the wanted top N size + */ + public void validateTopNSize(final int size) { + if (size > QueryInsightsSettings.MAX_N_SIZE) { + throw new IllegalArgumentException( + "Top N size setting for [" + + metricType + + "]" + + " should be smaller than max top N size [" + + QueryInsightsSettings.MAX_N_SIZE + + "was (" + + size + + " > " + + QueryInsightsSettings.MAX_N_SIZE + + ")" + ); + } + } + + /** + * Set enable flag for the service + * @param enabled boolean + */ + public void setEnabled(final boolean enabled) { + this.enabled = enabled; + } + + /** + * Set the window size for top N queries service + * + * @param windowSize window size to set + */ + public void setWindowSize(final TimeValue windowSize) { + this.windowSize = windowSize; + // reset the window start time since the window size has changed + this.windowStart = -1L; + } + + /** + * Validate if the window size is valid, based on internal constrains. + * + * @param windowSize the window size to validate + */ + public void validateWindowSize(final TimeValue windowSize) { + if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0 + || windowSize.compareTo(QueryInsightsSettings.MIN_WINDOW_SIZE) < 0) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be between [" + + QueryInsightsSettings.MIN_WINDOW_SIZE + + "," + + QueryInsightsSettings.MAX_WINDOW_SIZE + + "]" + + "was (" + + windowSize + + ")" + ); + } + if (!(QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES.contains(windowSize) || windowSize.getMinutes() % 60 == 0)) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be multiple of 1 hour, or one of " + + QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES + + ", was (" + + windowSize + + ")" + ); + } + } + + /** + * Get all top queries records that are in the current top n queries store + * Optionally include top N records from the last window. + * + * By default, return the records in sorted order. + * + * @param includeLastWindow if the top N queries from the last window should be included + * @return List of the records that are in the query insight store + * @throws IllegalArgumentException if query insight is disabled in the cluster + */ + public List getTopQueriesRecords(final boolean includeLastWindow) throws IllegalArgumentException { + if (!enabled) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString()) + ); + } + // read from window snapshots + final List queries = new ArrayList<>(topQueriesCurrentSnapshot.get()); + if (includeLastWindow) { + queries.addAll(topQueriesHistorySnapshot.get()); + } + return Stream.of(queries) + .flatMap(Collection::stream) + .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) + .collect(Collectors.toList()); + } + + /** + * Consume records to top queries stores + * + * @param records a list of {@link SearchQueryRecord} + */ + void consumeRecords(final List records) { + final long currentWindowStart = calculateWindowStart(System.currentTimeMillis()); + List recordsInLastWindow = new ArrayList<>(); + List recordsInThisWindow = new ArrayList<>(); + for (SearchQueryRecord record : records) { + // skip the records that does not have the corresponding measurement + if (!record.getMeasurements().containsKey(metricType)) { + continue; + } + if (record.getTimestamp() < currentWindowStart) { + recordsInLastWindow.add(record); + } else { + recordsInThisWindow.add(record); + } + } + // add records in last window, if there are any, to the top n store + addToTopNStore(recordsInLastWindow); + // rotate window and reset window start if necessary + rotateWindowIfNecessary(currentWindowStart); + // add records in current window, if there are any, to the top n store + addToTopNStore(recordsInThisWindow); + // update the current window snapshot for getters to consume + final List newSnapShot = new ArrayList<>(topQueriesStore); + newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot.set(newSnapShot); + } + + private void addToTopNStore(final List records) { + topQueriesStore.addAll(records); + // remove top elements for fix sizing priority queue + while (topQueriesStore.size() > topNSize) { + topQueriesStore.poll(); + } + } + + /** + * Reset the current window and rotate the data to history snapshot for top n queries, + * This function would be invoked zero time or only once in each consumeRecords call + * + * @param newWindowStart the new windowStart to set to + */ + private void rotateWindowIfNecessary(final long newWindowStart) { + // reset window if the current window is outdated + if (windowStart < newWindowStart) { + final List history = new ArrayList<>(); + // rotate the current window to history store only if the data belongs to the last window + if (windowStart == newWindowStart - windowSize.getMillis()) { + history.addAll(topQueriesStore); + } + topQueriesHistorySnapshot.set(history); + topQueriesStore.clear(); + topQueriesCurrentSnapshot.set(new ArrayList<>()); + windowStart = newWindowStart; + } + } + + /** + * Calculate the window start for the given timestamp + * + * @param timestamp the given timestamp to calculate window start + */ + private long calculateWindowStart(final long timestamp) { + final LocalDateTime currentTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.of("UTC")); + LocalDateTime windowStartTime = currentTime.truncatedTo(ChronoUnit.HOURS); + while (!windowStartTime.plusMinutes(windowSize.getMinutes()).isAfter(currentTime)) { + windowStartTime = windowStartTime.plusMinutes(windowSize.getMinutes()); + } + return windowStartTime.toInstant(ZoneOffset.UTC).getEpochSecond() * 1000; + } + + /** + * Get the current top queries snapshot from the AtomicReference. + * + * @return a list of {@link SearchQueryRecord} + */ + public List getTopQueriesCurrentSnapshot() { + return topQueriesCurrentSnapshot.get(); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java new file mode 100644 index 0000000000000..5068f28234f6d --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Classes for Query Insights + */ +package org.opensearch.plugin.insights.core.service; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java new file mode 100644 index 0000000000000..04d1f9bfff7e1 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of Query Insights + */ +package org.opensearch.plugin.insights; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java new file mode 100644 index 0000000000000..c1d17edf9ff14 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Locale; + +/** + * Valid attributes for a search query record + * + * @opensearch.internal + */ +public enum Attribute { + /** + * The search query type + */ + SEARCH_TYPE, + /** + * The search query source + */ + SOURCE, + /** + * Total shards queried + */ + TOTAL_SHARDS, + /** + * The indices involved + */ + INDICES, + /** + * The per phase level latency map for a search query + */ + PHASE_LATENCY_MAP, + /** + * The node id for this request + */ + NODE_ID; + + /** + * Read an Attribute from a StreamInput + * + * @param in the StreamInput to read from + * @return Attribute + * @throws IOException IOException + */ + static Attribute readFromStream(final StreamInput in) throws IOException { + return Attribute.valueOf(in.readString().toUpperCase(Locale.ROOT)); + } + + /** + * Write Attribute to a StreamOutput + * + * @param out the StreamOutput to write + * @param attribute the Attribute to write + * @throws IOException IOException + */ + static void writeTo(final StreamOutput out, final Attribute attribute) throws IOException { + out.writeString(attribute.toString()); + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java new file mode 100644 index 0000000000000..cdd090fbf4804 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Valid metric types for a search query record + * + * @opensearch.internal + */ +public enum MetricType implements Comparator { + /** + * Latency metric type + */ + LATENCY, + /** + * CPU usage metric type + */ + CPU, + /** + * JVM heap usage metric type + */ + JVM; + + /** + * Read a MetricType from a StreamInput + * + * @param in the StreamInput to read from + * @return MetricType + * @throws IOException IOException + */ + public static MetricType readFromStream(final StreamInput in) throws IOException { + return fromString(in.readString()); + } + + /** + * Create MetricType from String + * + * @param metricType the String representation of MetricType + * @return MetricType + */ + public static MetricType fromString(final String metricType) { + return MetricType.valueOf(metricType.toUpperCase(Locale.ROOT)); + } + + /** + * Write MetricType to a StreamOutput + * + * @param out the StreamOutput to write + * @param metricType the MetricType to write + * @throws IOException IOException + */ + static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException { + out.writeString(metricType.toString()); + } + + @Override + public String toString() { + return this.name().toLowerCase(Locale.ROOT); + } + + /** + * Get all valid metrics + * + * @return A set of String that contains all valid metrics + */ + public static Set allMetricTypes() { + return Arrays.stream(values()).collect(Collectors.toSet()); + } + + /** + * Compare two numbers based on the metric type + * + * @param a the first Number to be compared. + * @param b the second Number to be compared. + * @return a negative integer, zero, or a positive integer as the first argument is less than, equal to, or greater than the second + */ + public int compare(final Number a, final Number b) { + switch (this) { + case LATENCY: + return Long.compare(a.longValue(), b.longValue()); + case JVM: + case CPU: + return Double.compare(a.doubleValue(), b.doubleValue()); + } + return -1; + } + + /** + * Parse a value with the correct type based on MetricType + * + * @param o the generic object to parse + * @return {@link Number} + */ + Number parseValue(final Object o) { + switch (this) { + case LATENCY: + return (Long) o; + case JVM: + case CPU: + return (Double) o; + default: + return (Number) o; + } + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java new file mode 100644 index 0000000000000..060711edb5580 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -0,0 +1,176 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * SearchQueryRecord represents a minimal atomic record stored in the Query Insight Framework, + * which contains extensive information related to a search query. + * + * @opensearch.internal + */ +public class SearchQueryRecord implements ToXContentObject, Writeable { + private final long timestamp; + private final Map measurements; + private final Map attributes; + + /** + * Constructor of SearchQueryRecord + * + * @param in the StreamInput to read the SearchQueryRecord from + * @throws IOException IOException + * @throws ClassCastException ClassCastException + */ + public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { + this.timestamp = in.readLong(); + measurements = new HashMap<>(); + in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) + .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); + this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); + } + + /** + * Constructor of SearchQueryRecord + * + * @param timestamp The timestamp of the query. + * @param measurements A list of Measurement associated with this query + * @param attributes A list of Attributes associated with this query + */ + public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { + if (measurements == null) { + throw new IllegalArgumentException("Measurements cannot be null"); + } + this.measurements = measurements; + this.attributes = attributes; + this.timestamp = timestamp; + } + + /** + * Returns the observation time of the metric. + * + * @return the observation time in milliseconds + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Returns the measurement associated with the specified name. + * + * @param name the name of the measurement + * @return the measurement object, or null if not found + */ + public Number getMeasurement(final MetricType name) { + return measurements.get(name); + } + + /** + * Returns a map of all the measurements associated with the metric. + * + * @return a map of measurement names to measurement objects + */ + public Map getMeasurements() { + return measurements; + } + + /** + * Returns a map of the attributes associated with the metric. + * + * @return a map of attribute keys to attribute values + */ + public Map getAttributes() { + return attributes; + } + + /** + * Add an attribute to this record + * + * @param attribute attribute to add + * @param value the value associated with the attribute + */ + public void addAttribute(final Attribute attribute, final Object value) { + attributes.put(attribute, value); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field("timestamp", timestamp); + for (Map.Entry entry : attributes.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + for (Map.Entry entry : measurements.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); + } + return builder.endObject(); + } + + /** + * Write a SearchQueryRecord to a StreamOutput + * + * @param out the StreamOutput to write + * @throws IOException IOException + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeLong(timestamp); + out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); + out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); + } + + /** + * Compare two SearchQueryRecord, based on the given MetricType + * + * @param a the first SearchQueryRecord to compare + * @param b the second SearchQueryRecord to compare + * @param metricType the MetricType to compare on + * @return 0 if the first SearchQueryRecord is numerically equal to the second SearchQueryRecord; + * -1 if the first SearchQueryRecord is numerically less than the second SearchQueryRecord; + * 1 if the first SearchQueryRecord is numerically greater than the second SearchQueryRecord. + */ + public static int compare(final SearchQueryRecord a, final SearchQueryRecord b, final MetricType metricType) { + return metricType.compare(a.getMeasurement(metricType), b.getMeasurement(metricType)); + } + + /** + * Check if a SearchQueryRecord is deep equal to another record + * + * @param o the other SearchQueryRecord record + * @return true if two records are deep equal, false otherwise. + */ + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (!(o instanceof SearchQueryRecord)) { + return false; + } + final SearchQueryRecord other = (SearchQueryRecord) o; + return timestamp == other.getTimestamp() + && measurements.equals(other.getMeasurements()) + && attributes.size() == other.getAttributes().size(); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, measurements, attributes); + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java new file mode 100644 index 0000000000000..c59ec1550f54b --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Data Models for Query Insight Records + */ +package org.opensearch.plugin.insights.rules.model; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java new file mode 100644 index 0000000000000..52cc1fbde790f --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -0,0 +1,116 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.settings; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Settings for Query Insights Plugin + * + * @opensearch.api + * @opensearch.experimental + */ +public class QueryInsightsSettings { + /** + * Executors settings + */ + public static final String QUERY_INSIGHTS_EXECUTOR = "query_insights_executor"; + /** + * Max number of thread + */ + public static final int MAX_THREAD_COUNT = 5; + /** + * Max number of requests for the consumer to collect at one time + */ + public static final int QUERY_RECORD_QUEUE_CAPACITY = 1000; + /** + * Time interval for record queue consumer to run + */ + public static final TimeValue QUERY_RECORD_QUEUE_DRAIN_INTERVAL = new TimeValue(5, TimeUnit.SECONDS); + /** + * Default Values and Settings + */ + public static final TimeValue MAX_WINDOW_SIZE = new TimeValue(1, TimeUnit.DAYS); + /** + * Minimal window size + */ + public static final TimeValue MIN_WINDOW_SIZE = new TimeValue(1, TimeUnit.MINUTES); + /** + * Valid window sizes + */ + public static final Set VALID_WINDOW_SIZES_IN_MINUTES = new HashSet<>( + Arrays.asList( + new TimeValue(1, TimeUnit.MINUTES), + new TimeValue(5, TimeUnit.MINUTES), + new TimeValue(10, TimeUnit.MINUTES), + new TimeValue(30, TimeUnit.MINUTES) + ) + ); + + /** Default N size for top N queries */ + public static final int MAX_N_SIZE = 100; + /** Default window size in seconds to keep the top N queries with latency data in query insight store */ + public static final TimeValue DEFAULT_WINDOW_SIZE = new TimeValue(60, TimeUnit.SECONDS); + /** Default top N size to keep the data in query insight store */ + public static final int DEFAULT_TOP_N_SIZE = 3; + /** + * Query Insights base uri + */ + public static final String PLUGINS_BASE_URI = "/_insights"; + + /** + * Settings for Top Queries + * + */ + public static final String TOP_QUERIES_BASE_URI = PLUGINS_BASE_URI + "/top_queries"; + /** Default prefix for top N queries feature */ + public static final String TOP_N_QUERIES_SETTING_PREFIX = "search.insights.top_queries"; + /** Default prefix for top N queries by latency feature */ + public static final String TOP_N_LATENCY_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".latency"; + /** + * Boolean setting for enabling top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_ENABLED = Setting.boolSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Int setting to define the top n size for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_SIZE = Setting.intSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".top_n_size", + DEFAULT_TOP_N_SIZE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Time setting to define the window size in seconds for top queries by latency. + */ + public static final Setting TOP_N_LATENCY_QUERIES_WINDOW_SIZE = Setting.positiveTimeSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".window_size", + DEFAULT_WINDOW_SIZE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default constructor + */ + public QueryInsightsSettings() {} +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java new file mode 100644 index 0000000000000..f3152bbf966cb --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Settings for Query Insights Plugin + */ +package org.opensearch.plugin.insights.settings; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java new file mode 100644 index 0000000000000..f6d95450855de --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsPluginTests.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.ActionRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.rest.RestHandler; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class QueryInsightsPluginTests extends OpenSearchTestCase { + + private QueryInsightsPlugin queryInsightsPlugin; + + private final Client client = mock(Client.class); + private ClusterService clusterService; + private final ThreadPool threadPool = mock(ThreadPool.class); + + @Before + public void setup() { + queryInsightsPlugin = new QueryInsightsPlugin(); + Settings.Builder settingsBuilder = Settings.builder(); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); + clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); + + clusterService = new ClusterService(settings, clusterSettings, threadPool); + + } + + public void testGetSettings() { + assertEquals( + Arrays.asList( + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE + ), + queryInsightsPlugin.getSettings() + ); + } + + public void testCreateComponent() { + List components = (List) queryInsightsPlugin.createComponents( + client, + clusterService, + threadPool, + null, + null, + null, + null, + null, + null, + null, + null + ); + assertEquals(1, components.size()); + assertTrue(components.get(0) instanceof QueryInsightsService); + } + + public void testGetRestHandlers() { + List components = queryInsightsPlugin.getRestHandlers(Settings.EMPTY, null, null, null, null, null, null); + assertEquals(0, components.size()); + } + + public void testGetActions() { + List> components = queryInsightsPlugin.getActions(); + assertEquals(0, components.size()); + } + +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java new file mode 100644 index 0000000000000..04f49d5fbc7dd --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -0,0 +1,155 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights; + +import org.opensearch.action.search.SearchType; +import org.opensearch.common.util.Maps; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLengthBetween; +import static org.opensearch.test.OpenSearchTestCase.randomArray; +import static org.opensearch.test.OpenSearchTestCase.randomDouble; +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; +import static org.opensearch.test.OpenSearchTestCase.randomLong; +import static org.opensearch.test.OpenSearchTestCase.randomLongBetween; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +final public class QueryInsightsTestUtils { + + public QueryInsightsTestUtils() {} + + public static List generateQueryInsightRecords(int count) { + return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0); + } + + /** + * Creates a List of random Query Insight Records for testing purpose + */ + public static List generateQueryInsightRecords(int lower, int upper, long startTimeStamp, long interval) { + List records = new ArrayList<>(); + int countOfRecords = randomIntBetween(lower, upper); + long timestamp = startTimeStamp; + for (int i = 0; i < countOfRecords; ++i) { + Map measurements = Map.of( + MetricType.LATENCY, + randomLongBetween(1000, 10000), + MetricType.CPU, + randomDouble(), + MetricType.JVM, + randomDouble() + ); + + Map phaseLatencyMap = new HashMap<>(); + int countOfPhases = randomIntBetween(2, 5); + for (int j = 0; j < countOfPhases; ++j) { + phaseLatencyMap.put(randomAlphaOfLengthBetween(5, 10), randomLong()); + } + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.SOURCE, "{\"size\":20}"); + attributes.put(Attribute.TOTAL_SHARDS, randomIntBetween(1, 100)); + attributes.put(Attribute.INDICES, randomArray(1, 3, Object[]::new, () -> randomAlphaOfLengthBetween(5, 10))); + attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + + records.add(new SearchQueryRecord(timestamp, measurements, attributes)); + timestamp += interval; + } + return records; + } + + public static SearchQueryRecord createFixedSearchQueryRecord() { + long timestamp = 1706574180000L; + Map measurements = Map.of(MetricType.LATENCY, 1L); + + Map phaseLatencyMap = new HashMap<>(); + Map attributes = new HashMap<>(); + attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + + return new SearchQueryRecord(timestamp, measurements, attributes); + } + + public static void compareJson(ToXContent param1, ToXContent param2) throws IOException { + if (param1 == null || param2 == null) { + assertNull(param1); + assertNull(param2); + return; + } + + ToXContent.Params params = ToXContent.EMPTY_PARAMS; + XContentBuilder param1Builder = jsonBuilder(); + param1.toXContent(param1Builder, params); + + XContentBuilder param2Builder = jsonBuilder(); + param2.toXContent(param2Builder, params); + + assertEquals(param1Builder.toString(), param2Builder.toString()); + } + + @SuppressWarnings("unchecked") + public static boolean checkRecordsEquals(List records1, List records2) { + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!records1.get(i).equals(records2.get(i))) { + return false; + } + Map attributes1 = records1.get(i).getAttributes(); + Map attributes2 = records2.get(i).getAttributes(); + for (Map.Entry entry : attributes1.entrySet()) { + Attribute attribute = entry.getKey(); + Object value = entry.getValue(); + if (!attributes2.containsKey(attribute)) { + return false; + } + if (value instanceof Object[] && !Arrays.deepEquals((Object[]) value, (Object[]) attributes2.get(attribute))) { + return false; + } else if (value instanceof Map + && !Maps.deepEquals((Map) value, (Map) attributes2.get(attribute))) { + return false; + } + } + } + return true; + } + + public static boolean checkRecordsEqualsWithoutOrder( + List records1, + List records2, + MetricType metricType + ) { + Set set2 = new TreeSet<>((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + set2.addAll(records2); + if (records1.size() != records2.size()) { + return false; + } + for (int i = 0; i < records1.size(); i++) { + if (!set2.contains(records1.get(i))) { + return false; + } + } + return true; + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java new file mode 100644 index 0000000000000..c29b48b9690d1 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.junit.Before; + +import static org.mockito.Mockito.mock; + +/** + * Unit Tests for {@link QueryInsightsService}. + */ +public class QueryInsightsServiceTests extends OpenSearchTestCase { + private final ThreadPool threadPool = mock(ThreadPool.class); + private QueryInsightsService queryInsightsService; + + @Before + public void setup() { + queryInsightsService = new QueryInsightsService(threadPool); + queryInsightsService.enableCollection(MetricType.LATENCY, true); + queryInsightsService.enableCollection(MetricType.CPU, true); + queryInsightsService.enableCollection(MetricType.JVM, true); + } + + public void testAddRecordToLimitAndDrain() { + SearchQueryRecord record = QueryInsightsTestUtils.generateQueryInsightRecords(1, 1, System.currentTimeMillis(), 0).get(0); + for (int i = 0; i < QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY; i++) { + assertTrue(queryInsightsService.addRecord(record)); + } + // exceed capacity + assertFalse(queryInsightsService.addRecord(record)); + queryInsightsService.drainRecords(); + assertEquals( + QueryInsightsSettings.DEFAULT_TOP_N_SIZE, + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() + ); + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java new file mode 100644 index 0000000000000..060df84a89485 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.cluster.coordination.DeterministicTaskQueue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Unit Tests for {@link QueryInsightsService}. + */ +public class TopQueriesServiceTests extends OpenSearchTestCase { + private TopQueriesService topQueriesService; + + @Before + public void setup() { + topQueriesService = new TopQueriesService(MetricType.LATENCY); + topQueriesService.setTopNSize(Integer.MAX_VALUE); + topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); + topQueriesService.setEnabled(true); + } + + public void testIngestQueryDataWithLargeWindow() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.consumeRecords(records); + assertTrue( + QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder( + topQueriesService.getTopQueriesRecords(false), + records, + MetricType.LATENCY + ) + ); + } + + public void testRollingWindows() { + List records; + // Create 5 records at Now - 10 minutes to make sure they belong to the last window + records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + + // Create 10 records at now + 1 minute, to make sure they belong to the current window + records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(10, topQueriesService.getTopQueriesRecords(true).size()); + } + + public void testSmallNSize() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.setTopNSize(1); + topQueriesService.consumeRecords(records); + assertEquals(1, topQueriesService.getTopQueriesRecords(false).size()); + } + + public void testValidateTopNSize() { + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); }); + } + + public void testGetTopQueriesWhenNotEnabled() { + topQueriesService.setEnabled(false); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); }); + } + + public void testValidateWindowSize() { + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MAX_WINDOW_SIZE.getSeconds() + 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); }); + } + + private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) { + final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; + while (deterministicTaskQueue.getCurrentTimeMillis() < endTime + && (deterministicTaskQueue.hasRunnableTasks() || deterministicTaskQueue.hasDeferredTasks())) { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + } +} diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java new file mode 100644 index 0000000000000..793d5878e2300 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Granular tests for the {@link SearchQueryRecord} class. + */ +public class SearchQueryRecordTests extends OpenSearchTestCase { + + /** + * Check that if the serialization, deserialization and equals functions are working as expected + */ + public void testSerializationAndEquals() throws Exception { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + List copiedRecords = new ArrayList<>(); + for (SearchQueryRecord record : records) { + copiedRecords.add(roundTripRecord(record)); + } + assertTrue(QueryInsightsTestUtils.checkRecordsEquals(records, copiedRecords)); + + } + + public void testAllMetricTypes() { + Set allMetrics = MetricType.allMetricTypes(); + Set expected = new HashSet<>(Arrays.asList(MetricType.LATENCY, MetricType.CPU, MetricType.JVM)); + assertEquals(expected, allMetrics); + } + + public void testCompare() { + SearchQueryRecord record1 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + SearchQueryRecord record2 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + assertEquals(0, SearchQueryRecord.compare(record1, record2, MetricType.LATENCY)); + } + + public void testEqual() { + SearchQueryRecord record1 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + SearchQueryRecord record2 = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + assertEquals(record1, record2); + } + + /** + * Serialize and deserialize a SearchQueryRecord. + * @param record A SearchQueryRecord to serialize. + * @return The deserialized, "round-tripped" record. + */ + private static SearchQueryRecord roundTripRecord(SearchQueryRecord record) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + record.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new SearchQueryRecord(in); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 96cea17ff4972..f738c182c06da 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -359,7 +359,7 @@ boolean isFinalReduce() { * request. When created through {@link #subSearchRequest(SearchRequest, String[], String, long, boolean)}, this method returns * the provided current time, otherwise it will return {@link System#currentTimeMillis()}. */ - long getOrCreateAbsoluteStartMillis() { + public long getOrCreateAbsoluteStartMillis() { return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index eceac7204b196..383d9b5e82fe2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -22,7 +22,7 @@ * @opensearch.internal */ @InternalApi -class SearchRequestContext { +public class SearchRequestContext { private final SearchRequestOperationsListener searchRequestOperationsListener; private long absoluteStartNanos; private final Map phaseTookMap; @@ -47,7 +47,7 @@ void updatePhaseTookMap(String phaseName, Long tookTime) { this.phaseTookMap.put(phaseName, tookTime); } - Map phaseTookMap() { + public Map phaseTookMap() { return phaseTookMap; } @@ -70,7 +70,7 @@ void setAbsoluteStartNanos(long absoluteStartNanos) { /** * Request start time in nanos */ - long getAbsoluteStartNanos() { + public long getAbsoluteStartNanos() { return absoluteStartNanos; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 2a09cc084f79f..2acb35af667f0 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -31,21 +31,21 @@ protected SearchRequestOperationsListener(final boolean enabled) { this.enabled = enabled; } - abstract void onPhaseStart(SearchPhaseContext context); + protected abstract void onPhaseStart(SearchPhaseContext context); - abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); + protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); - abstract void onPhaseFailure(SearchPhaseContext context); + protected abstract void onPhaseFailure(SearchPhaseContext context); - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - boolean isEnabled(SearchRequest searchRequest) { + protected boolean isEnabled(SearchRequest searchRequest) { return isEnabled(); } - boolean isEnabled() { + protected boolean isEnabled() { return enabled; } @@ -69,7 +69,7 @@ static final class CompositeListener extends SearchRequestOperationsListener { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseStart(context); @@ -80,7 +80,7 @@ void onPhaseStart(SearchPhaseContext context) { } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseEnd(context, searchRequestContext); @@ -91,7 +91,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onPhaseFailure(context); @@ -102,7 +102,7 @@ void onPhaseFailure(SearchPhaseContext context) { } @Override - void onRequestStart(SearchRequestContext searchRequestContext) { + protected void onRequestStart(SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { listener.onRequestStart(searchRequestContext); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 7f25f9026f215..74e04d976cb1c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -134,19 +134,19 @@ public SearchRequestSlowLog(ClusterService clusterService) { } @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} @Override - void onRequestStart(SearchRequestContext searchRequestContext) {} + protected void onRequestStart(SearchRequestContext searchRequestContext) {} @Override - void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos(); if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 88d599a0dcdaa..ac32b08afb7f6 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -58,12 +58,12 @@ public long getPhaseMetric(SearchPhaseName searchPhaseName) { } @Override - void onPhaseStart(SearchPhaseContext context) { + protected void onPhaseStart(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); phaseStats.current.dec(); phaseStats.total.inc(); @@ -71,7 +71,7 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo } @Override - void onPhaseFailure(SearchPhaseContext context) { + protected void onPhaseFailure(SearchPhaseContext context) { phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index 78c5ba4412c68..1cb336e18b12c 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -119,13 +119,13 @@ public void testStandardListenerAndPerRequestListenerDisabled() { public SearchRequestOperationsListener createTestSearchRequestOperationsListener() { return new SearchRequestOperationsListener() { @Override - void onPhaseStart(SearchPhaseContext context) {} + protected void onPhaseStart(SearchPhaseContext context) {} @Override - void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} @Override - void onPhaseFailure(SearchPhaseContext context) {} + protected void onPhaseFailure(SearchPhaseContext context) {} }; } }