Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] add request failure listener to track failures at query group level (#15527) #15539

Merged
merged 2 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add runAs to Subject interface and introduce IdentityAwarePlugin extension point ([#14630](https://github.com/opensearch-project/OpenSearch/pull/14630))
- [Workload Management] Add rejection logic for co-ordinator and shard level requests ([#15428](https://github.com/opensearch-project/OpenSearch/pull/15428)))
- Adding translog durability validation in index templates ([#15494](https://github.com/opensearch-project/OpenSearch/pull/15494))
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))

### Dependencies
Expand Down
15 changes: 8 additions & 7 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.listeners.QueryGroupRequestRejectionOperationListener;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -998,11 +998,12 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupRequestRejectionOperationListener queryGroupRequestRejectionListener =
new QueryGroupRequestRejectionOperationListener(
new QueryGroupService(), // We will need to replace this with actual instance of the queryGroupService
threadPool
);
final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
);

// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
Expand All @@ -1012,7 +1013,7 @@ protected Node(
searchRequestStats,
searchRequestSlowLog,
searchTaskRequestOperationsListener,
queryGroupRequestRejectionListener
queryGroupRequestOperationListener
),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
Expand Down
50 changes: 49 additions & 1 deletion server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,59 @@
package org.opensearch.wlm;

import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;

import java.util.HashMap;
import java.util.Map;

/**
* This is stub at this point in time and will be replace by an acutal one in couple of days
* As of now this is a stub and main implementation PR will be raised soon.Coming PR will collate these changes with core QueryGroupService changes
*/
public class QueryGroupService {
// This map does not need to be concurrent since we will process the cluster state change serially and update
// this map with new additions and deletions of entries. QueryGroupState is thread safe
private final Map<String, QueryGroupState> queryGroupStateMap;

public QueryGroupService() {
this(new HashMap<>());
}

public QueryGroupService(Map<String, QueryGroupState> queryGroupStateMap) {
this.queryGroupStateMap = queryGroupStateMap;
}

/**
* updates the failure stats for the query group
* @param queryGroupId query group identifier
*/
public void incrementFailuresFor(final String queryGroupId) {
QueryGroupState queryGroupState = queryGroupStateMap.get(queryGroupId);
// This can happen if the request failed for a deleted query group
// or new queryGroup is being created and has not been acknowledged yet
if (queryGroupState == null) {
return;
}
queryGroupState.failures.inc();
}

/**
*
* @return node level query group stats
*/
public QueryGroupStats nodeStats() {
final Map<String, QueryGroupStatsHolder> statsHolderMap = new HashMap<>();
for (Map.Entry<String, QueryGroupState> queryGroupsState : queryGroupStateMap.entrySet()) {
final String queryGroupId = queryGroupsState.getKey();
final QueryGroupState currentState = queryGroupsState.getValue();

statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState));
}

return new QueryGroupStats(statsHolderMap);
}

/**
*
* @param queryGroupId query group identifier
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/wlm/ResourceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.tasks.Task;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;

/**
Expand All @@ -31,6 +32,8 @@ public enum ResourceType {
private final Function<Task, Long> getResourceUsage;
private final boolean statsEnabled;

private static List<ResourceType> sortedValues = List.of(CPU, MEMORY);

ResourceType(String name, Function<Task, Long> getResourceUsage, boolean statsEnabled) {
this.name = name;
this.getResourceUsage = getResourceUsage;
Expand Down Expand Up @@ -72,4 +75,8 @@ public long getResourceUsage(Task task) {
public boolean hasStatsEnabled() {
return statsEnabled;
}

public static List<ResourceType> getSortedValues() {
return sortedValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@

package org.opensearch.wlm.listeners;

import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.QueryGroupTask;

/**
* This listener is used to perform the rejections for incoming requests into a queryGroup
* This listener is used to listen for request lifecycle events for a queryGroup
*/
public class QueryGroupRequestRejectionOperationListener extends SearchRequestOperationsListener {
public class QueryGroupRequestOperationListener extends SearchRequestOperationsListener {

private final QueryGroupService queryGroupService;
private final ThreadPool threadPool;

public QueryGroupRequestRejectionOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
public QueryGroupRequestOperationListener(QueryGroupService queryGroupService, ThreadPool threadPool) {
this.queryGroupService = queryGroupService;
this.threadPool = threadPool;
}
Expand All @@ -36,4 +37,10 @@ protected void onRequestStart(SearchRequestContext searchRequestContext) {
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
queryGroupService.rejectIfNeeded(queryGroupId);
}

@Override
protected void onRequestFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
final String queryGroupId = threadPool.getThreadContext().getHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER);
queryGroupService.incrementFailuresFor(queryGroupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* this will track the cumulative failures in a query group
*/
final CounterMetric failures = new CounterMetric();
public final CounterMetric failures = new CounterMetric();

/**
* This will track total number of cancellations in the query group due to all resource type breaches
Expand Down Expand Up @@ -95,9 +95,18 @@
final ResourceType resourceType;
final CounterMetric cancellations = new CounterMetric();
final CounterMetric rejections = new CounterMetric();
private double lastRecordedUsage = 0;

public ResourceTypeState(ResourceType resourceType) {
this.resourceType = resourceType;
}

public void setLastRecordedUsage(double recordedUsage) {
lastRecordedUsage = recordedUsage;
}

Check warning on line 106 in server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/wlm/stats/QueryGroupState.java#L105-L106

Added lines #L105 - L106 were not covered by tests

public double getLastRecordedUsage() {
return lastRecordedUsage;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.wlm.ResourceType;
import org.opensearch.wlm.stats.QueryGroupState.ResourceTypeState;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -52,7 +56,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("query_groups");
for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : stats.entrySet()) {
// to keep the toXContent consistent
List<Map.Entry<String, QueryGroupStatsHolder>> entryList = new ArrayList<>(stats.entrySet());
entryList.sort((k1, k2) -> k1.getKey().compareTo(k2.getKey()));

for (Map.Entry<String, QueryGroupStatsHolder> queryGroupStats : entryList) {
builder.startObject(queryGroupStats.getKey());
queryGroupStats.getValue().toXContent(builder, params);
builder.endObject();
Expand Down Expand Up @@ -83,11 +91,14 @@ public static class QueryGroupStatsHolder implements ToXContentObject, Writeable
public static final String REJECTIONS = "rejections";
public static final String TOTAL_CANCELLATIONS = "total_cancellations";
public static final String FAILURES = "failures";
private final long completions;
private final long rejections;
private final long failures;
private final long totalCancellations;
private final Map<ResourceType, ResourceStats> resourceStats;
private long completions;
private long rejections;
private long failures;
private long totalCancellations;
private Map<ResourceType, ResourceStats> resourceStats;

// this is needed to support the factory method
public QueryGroupStatsHolder() {}

public QueryGroupStatsHolder(
long completions,
Expand All @@ -111,6 +122,28 @@ public QueryGroupStatsHolder(StreamInput in) throws IOException {
this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new);
}

/**
* static factory method to convert {@link QueryGroupState} into {@link QueryGroupStatsHolder}
* @param queryGroupState which needs to be converted
* @return QueryGroupStatsHolder object
*/
public static QueryGroupStatsHolder from(QueryGroupState queryGroupState) {
final QueryGroupStatsHolder statsHolder = new QueryGroupStatsHolder();

Map<ResourceType, ResourceStats> resourceStatsMap = new HashMap<>();

for (Map.Entry<ResourceType, ResourceTypeState> resourceTypeStateEntry : queryGroupState.getResourceState().entrySet()) {
resourceStatsMap.put(resourceTypeStateEntry.getKey(), ResourceStats.from(resourceTypeStateEntry.getValue()));
}

statsHolder.completions = queryGroupState.getCompletions();
statsHolder.rejections = queryGroupState.getTotalRejections();
statsHolder.failures = queryGroupState.getFailures();
statsHolder.totalCancellations = queryGroupState.getTotalCancellations();
statsHolder.resourceStats = resourceStatsMap;
return statsHolder;
}

/**
* Writes the {@param statsHolder} to {@param out}
* @param out StreamOutput
Expand All @@ -136,9 +169,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(REJECTIONS, rejections);
builder.field(FAILURES, failures);
builder.field(TOTAL_CANCELLATIONS, totalCancellations);
for (Map.Entry<ResourceType, ResourceStats> resourceStat : resourceStats.entrySet()) {
ResourceType resourceType = resourceStat.getKey();
ResourceStats resourceStats1 = resourceStat.getValue();

for (ResourceType resourceType : ResourceType.getSortedValues()) {
ResourceStats resourceStats1 = resourceStats.get(resourceType);
if (resourceStats1 == null) continue;
builder.startObject(resourceType.getName());
resourceStats1.toXContent(builder, params);
builder.endObject();
Expand Down Expand Up @@ -187,6 +221,19 @@ public ResourceStats(StreamInput in) throws IOException {
this.rejections = in.readVLong();
}

/**
* static factory method to convert {@link ResourceTypeState} into {@link ResourceStats}
* @param resourceTypeState which needs to be converted
* @return QueryGroupStatsHolder object
*/
public static ResourceStats from(ResourceTypeState resourceTypeState) {
return new ResourceStats(
resourceTypeState.getLastRecordedUsage(),
resourceTypeState.cancellations.count(),
resourceTypeState.rejections.count()
);
}

/**
* Writes the {@param stats} to {@param out}
* @param out StreamOutput
Expand Down
Loading
Loading