Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move query categorization changes to plugin #16

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.resthandler.top_queries.RestTopQueriesAction;
import org.opensearch.plugin.insights.rules.transport.top_queries.TransportTopQueriesAction;
import org.opensearch.plugin.insights.settings.QueryCategorizationSettings;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryAwarePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -49,7 +53,7 @@
/**
* Plugin class for Query Insights.
*/
public class QueryInsightsPlugin extends Plugin implements ActionPlugin {
public class QueryInsightsPlugin extends Plugin implements ActionPlugin, TelemetryAwarePlugin {
/**
* Default constructor
*/
Expand All @@ -67,10 +71,17 @@ public Collection<Object> createComponents(
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
final Supplier<RepositoriesService> repositoriesServiceSupplier,
final Tracer tracer,
final MetricsRegistry metricsRegistry
) {
// create top n queries service
final QueryInsightsService queryInsightsService = new QueryInsightsService(clusterService.getClusterSettings(), threadPool, client);
final QueryInsightsService queryInsightsService = new QueryInsightsService(
clusterService.getClusterSettings(),
threadPool,
client,
metricsRegistry
);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
}

Expand Down Expand Up @@ -119,7 +130,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_ENABLED,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,27 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns
* and query insights services.
*
* @param metricType {@link MetricType}
* @param enabled boolean
* @param isCurrentMetricEnabled boolean
*/
public void setEnableTopQueries(final MetricType metricType, final boolean enabled) {
boolean isAllMetricsDisabled = !queryInsightsService.isEnabled();
this.queryInsightsService.enableCollection(metricType, enabled);
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
if (!enabled) {
// disable QueryInsightsListener only if all metrics collections are disabled now.
if (!queryInsightsService.isEnabled()) {
public void setEnableTopQueries(final MetricType metricType, final boolean isCurrentMetricEnabled) {
boolean isTopNFeatureDisabled = !queryInsightsService.isTopNFeatureEnabled();
this.queryInsightsService.enableCollection(metricType, isCurrentMetricEnabled);

if (!isCurrentMetricEnabled) {
// disable QueryInsightsListener only if all metrics collections are disabled now
// and search query metrics is disabled.
if (isTopNFeatureDisabled) {
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
super.setEnabled(false);
this.queryInsightsService.stop();
queryInsightsService.checkAndStopQueryInsights();
}
} else {
super.setEnabled(true);
// restart QueryInsightsListener only if none of metrics collections is enabled before.
if (isAllMetricsDisabled) {
this.queryInsightsService.stop();
this.queryInsightsService.start();
// restart QueryInsightsListener only if none of metrics collections is enabled before and
// search query metrics is disabled before.
if (isTopNFeatureDisabled) {
queryInsightsService.checkAndRestartQueryInsights();
}
}

}

@Override
Expand Down Expand Up @@ -176,7 +177,7 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

package org.opensearch.plugin.insights.core.service;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -29,13 +33,17 @@
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

/**
* Service responsible for gathering, analyzing, storing and exporting
* information related to search queries
*/
public class QueryInsightsService extends AbstractLifecycleComponent {

private static final Logger logger = LogManager.getLogger(QueryInsightsService.class);

/**
* The internal OpenSearch thread pool that execute async processing and exporting tasks
*/
Expand Down Expand Up @@ -67,15 +75,25 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;

/**
* Constructor of the QueryInsightsService
*
* @param clusterSettings OpenSearch cluster level settings
* @param threadPool The OpenSearch thread pool to run async tasks
* @param client OS client
* @param metricsRegistry Opentelemetry Metrics registry
*/
@Inject
public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadPool threadPool, final Client client) {
public QueryInsightsService(
final ClusterSettings clusterSettings,
final ThreadPool threadPool,
final Client client,
final MetricsRegistry metricsRegistry
) {
enableCollect = new HashMap<>();
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY);
this.threadPool = threadPool;
Expand All @@ -93,6 +111,10 @@ public QueryInsightsService(final ClusterSettings clusterSettings, final ThreadP
(settings -> validateExporterConfig(type, settings))
);
}

this.searchQueryMetricsEnabled = clusterSettings.get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
clusterSettings.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
}

/**
Expand Down Expand Up @@ -133,6 +155,14 @@ public void drainRecords() {
topQueriesServices.get(metricType).consumeRecords(records);
}
}

if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.consumeRecords(records);
} catch (Exception e) {
logger.error("Error while trying to categorize the queries.", e);
}
}
}

/**
Expand Down Expand Up @@ -166,11 +196,20 @@ public boolean isCollectionEnabled(final MetricType metricType) {
}

/**
* Check if query insights service is enabled
* Check if any feature of Query Insights service is enabled, right now includes Top N and Categorization.
*
* @return if query insights service is enabled
*/
public boolean isEnabled() {
public boolean isAnyFeatureEnabled() {
return isTopNFeatureEnabled() || isSearchQueryMetricsFeatureEnabled();
}

/**
* Check if top N enabled for any metric type
*
* @return if top N feature is enabled
*/
public boolean isTopNFeatureEnabled() {
for (MetricType t : MetricType.allMetricTypes()) {
if (isCollectionEnabled(t)) {
return true;
Expand All @@ -179,6 +218,33 @@ public boolean isEnabled() {
return false;
}

/**
* Is search query metrics feature enabled.
* @return boolean flag
*/
public boolean isSearchQueryMetricsFeatureEnabled() {
return this.searchQueryMetricsEnabled;
}

/**
* Stops query insights service if no features enabled
*/
public void checkAndStopQueryInsights() {
if (!isAnyFeatureEnabled()) {
this.stop();
}
}

/**
* Restarts query insights service if any feature enabled
*/
public void checkAndRestartQueryInsights() {
if (isAnyFeatureEnabled()) {
this.stop();
this.start();
}
}

/**
* Validate the window size config for a metricType
*
Expand Down Expand Up @@ -239,6 +305,32 @@ public void setExporter(final MetricType type, final Settings settings) {
}
}

/**
* Set search query metrics enabled to enable collection of search query categorization metrics
* @param searchQueryMetricsEnabled boolean flag
*/
public void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
boolean oldsearchQueryMetricsEnabled = isSearchQueryMetricsFeatureEnabled();
this.searchQueryMetricsEnabled = searchQueryMetricsEnabled;
deshsidd marked this conversation as resolved.
Show resolved Hide resolved
if (searchQueryMetricsEnabled) {
if (!oldsearchQueryMetricsEnabled) {
checkAndRestartQueryInsights();
}
} else {
if (oldsearchQueryMetricsEnabled) {
checkAndStopQueryInsights();
}
}
}

/**
* Get search query categorizer object
* @return SearchQueryCategorizer object
*/
public SearchQueryCategorizer getSearchQueryCategorizer() {
return this.searchQueryCategorizer;
}

/**
* Validate the exporter config for a metricType
*
Expand All @@ -253,7 +345,7 @@ public void validateExporterConfig(final MetricType type, final Settings setting

@Override
protected void doStart() {
if (isEnabled()) {
if (isAnyFeatureEnabled()) {
scheduledFuture = threadPool.scheduleWithFixedDelay(
this::drainRecords,
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL,
Expand Down
Loading
Loading