Skip to content

Commit

Permalink
Add success and failure count OTel metrics for async shard fetch (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#15976)

Signed-off-by: Rahul Karajgikar <karajgik@amazon.com>
  • Loading branch information
rahulkarajgikar authored Sep 30, 2024
1 parent 7ba8b78 commit 8ddb3ee
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove identity-related feature flagged code from the RestController ([#15430](https://github.com/opensearch-project/OpenSearch/pull/15430))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class TransportIndicesShardStoresAction extends TransportClusterManagerNo
private static final Logger logger = LogManager.getLogger(TransportIndicesShardStoresAction.class);

private final TransportNodesListGatewayStartedShards listShardStoresInfo;
private final ClusterManagerMetrics clusterManagerMetrics;

@Inject
public TransportIndicesShardStoresAction(
Expand All @@ -96,7 +98,8 @@ public TransportIndicesShardStoresAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportNodesListGatewayStartedShards listShardStoresInfo
TransportNodesListGatewayStartedShards listShardStoresInfo,
ClusterManagerMetrics clusterManagerMetrics
) {
super(
IndicesShardStoresAction.NAME,
Expand All @@ -109,6 +112,7 @@ public TransportIndicesShardStoresAction(
true
);
this.listShardStoresInfo = listShardStoresInfo;
this.clusterManagerMetrics = clusterManagerMetrics;
}

@Override
Expand Down Expand Up @@ -154,7 +158,7 @@ protected void clusterManagerOperation(
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener).start();
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener, clusterManagerMetrics).start();
}

@Override
Expand All @@ -175,27 +179,37 @@ private class AsyncShardStoresInfoFetches {
private final ActionListener<IndicesShardStoresResponse> listener;
private CountDown expectedOps;
private final Queue<InternalAsyncFetch.Response> fetchResponses;
private final ClusterManagerMetrics clusterManagerMetrics;

AsyncShardStoresInfoFetches(
DiscoveryNodes nodes,
RoutingNodes routingNodes,
Set<Tuple<ShardId, String>> shards,
ActionListener<IndicesShardStoresResponse> listener
ActionListener<IndicesShardStoresResponse> listener,
ClusterManagerMetrics clusterManagerMetrics
) {
this.nodes = nodes;
this.routingNodes = routingNodes;
this.shards = shards;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
this.expectedOps = new CountDown(shards.size());
this.clusterManagerMetrics = clusterManagerMetrics;
}

void start() {
if (shards.isEmpty()) {
listener.onResponse(new IndicesShardStoresResponse());
} else {
for (Tuple<ShardId, String> shard : shards) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), listShardStoresInfo);
InternalAsyncFetch fetch = new InternalAsyncFetch(
logger,
"shard_stores",
shard.v1(),
shard.v2(),
listShardStoresInfo,
clusterManagerMetrics
);
fetch.fetchData(nodes, Collections.emptyMap());
}
}
Expand All @@ -213,9 +227,10 @@ private class InternalAsyncFetch extends AsyncShardFetch<NodeGatewayStartedShard
String type,
ShardId shardId,
String customDataPath,
TransportNodesListGatewayStartedShards action
TransportNodesListGatewayStartedShards action,
ClusterManagerMetrics clusterManagerMetrics
) {
super(logger, type, shardId, customDataPath, action);
super(logger, type, shardId, customDataPath, action, clusterManagerMetrics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public final class ClusterManagerMetrics {

public final Counter leaderCheckFailureCounter;
public final Counter followerChecksFailureCounter;
public final Counter asyncFetchFailureCounter;
public final Counter asyncFetchSuccessCounter;

public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
clusterStateAppliersHistogram = metricsRegistry.createHistogram(
Expand Down Expand Up @@ -71,6 +73,17 @@ public ClusterManagerMetrics(MetricsRegistry metricsRegistry) {
"Counter for number of failed leader checks",
COUNTER_METRICS_UNIT
);
asyncFetchFailureCounter = metricsRegistry.createCounter(
"async.fetch.failure.count",
"Counter for number of failed async fetches",
COUNTER_METRICS_UNIT
);
asyncFetchSuccessCounter = metricsRegistry.createCounter(
"async.fetch.success.count",
"Counter for number of successful async fetches",
COUNTER_METRICS_UNIT
);

}

public void recordLatency(Histogram histogram, Double value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public class ClusterModule extends AbstractModule {
// pkg private for tests
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;
private final ClusterManagerMetrics clusterManagerMetrics;

public ClusterModule(
Settings settings,
Expand All @@ -166,6 +167,7 @@ public ClusterModule(
settings,
clusterManagerMetrics
);
this.clusterManagerMetrics = clusterManagerMetrics;
}

public static List<Entry> getNamedWriteables() {
Expand Down Expand Up @@ -456,6 +458,7 @@ protected void configure() {
bind(TaskResultsService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
bind(ClusterManagerMetrics.class).toInstance(clusterManagerMetrics);
}

public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator, ShardsBatchGatewayAllocator shardsBatchGatewayAllocator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -48,7 +49,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
Class<V> clazz,
V emptyShardResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
ShardBatchResponseFactory<T, V> responseFactory,
ClusterManagerMetrics clusterManagerMetrics
) {
super(
logger,
Expand All @@ -64,7 +66,8 @@ public abstract class AsyncShardBatchFetch<T extends BaseNodeResponse, V> extend
clazz,
emptyShardResponse,
emptyShardResponsePredicate,
responseFactory
responseFactory,
clusterManagerMetrics
)
);
}
Expand Down Expand Up @@ -116,9 +119,10 @@ public ShardBatchCache(
Class<V> clazz,
V emptyResponse,
Predicate<V> emptyShardResponsePredicate,
ShardBatchResponseFactory<T, V> responseFactory
ShardBatchResponseFactory<T, V> responseFactory,
ClusterManagerMetrics clusterManagerMetrics
) {
super(Loggers.getLogger(logger, "_" + logKey), type);
super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics);
this.batchSize = shardAttributesMap.size();
this.emptyShardResponsePredicate = emptyShardResponsePredicate;
cache = new HashMap<>();
Expand Down
10 changes: 6 additions & 4 deletions server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
Expand Down Expand Up @@ -94,15 +95,16 @@ protected AsyncShardFetch(
String type,
ShardId shardId,
String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action
Lister<? extends BaseNodesResponse<T>, T> action,
ClusterManagerMetrics clusterManagerMetrics
) {
this.logger = logger;
this.type = type;
shardAttributesMap = new HashMap<>();
shardAttributesMap.put(shardId, new ShardAttributes(customDataPath));
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.reroutingKey = "ShardId=[" + shardId.toString() + "]";
cache = new ShardCache<>(logger, reroutingKey, type);
cache = new ShardCache<>(logger, reroutingKey, type, clusterManagerMetrics);
}

/**
Expand Down Expand Up @@ -284,8 +286,8 @@ static class ShardCache<K extends BaseNodeResponse> extends AsyncShardFetchCache

private final Map<String, NodeEntry<K>> cache;

public ShardCache(Logger logger, String logKey, String type) {
super(Loggers.getLogger(logger, "_" + logKey), type);
public ShardCache(Logger logger, String logKey, String type, ClusterManagerMetrics clusterManagerMetrics) {
super(Loggers.getLogger(logger, "_" + logKey), type, clusterManagerMetrics);
cache = new HashMap<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
Expand Down Expand Up @@ -51,10 +52,12 @@ public abstract class AsyncShardFetchCache<K extends BaseNodeResponse> {

private final Logger logger;
private final String type;
private final ClusterManagerMetrics clusterManagerMetrics;

protected AsyncShardFetchCache(Logger logger, String type) {
protected AsyncShardFetchCache(Logger logger, String type, ClusterManagerMetrics clusterManagerMetrics) {
this.logger = logger;
this.type = type;
this.clusterManagerMetrics = clusterManagerMetrics;
}

abstract void initData(DiscoveryNode node);
Expand Down Expand Up @@ -162,6 +165,7 @@ Map<DiscoveryNode, K> getCacheData(DiscoveryNodes nodes, Set<String> failedNodes
}

void processResponses(List<K> responses, long fetchingRound) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchSuccessCounter, Double.valueOf(responses.size()));
for (K response : responses) {
BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId());
if (nodeEntry != null) {
Expand Down Expand Up @@ -222,6 +226,7 @@ boolean retryableException(Throwable unwrappedCause) {
}

void processFailures(List<FailedNodeException> failures, long fetchingRound) {
clusterManagerMetrics.incrementCounter(clusterManagerMetrics.asyncFetchFailureCounter, Double.valueOf(failures.size()));
for (FailedNodeException failure : failures) {
logger.trace("processing failure {} for [{}]", failure, type);
BaseNodeEntry nodeEntry = getCache().get(failure.nodeId());
Expand Down
27 changes: 18 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterManagerMetrics;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -92,11 +93,12 @@ public class GatewayAllocator implements ExistingShardsAllocator {
public GatewayAllocator(
RerouteService rerouteService,
TransportNodesListGatewayStartedShards startedAction,
TransportNodesListShardStoreMetadata storeAction
TransportNodesListShardStoreMetadata storeAction,
ClusterManagerMetrics clusterManagerMetrics
) {
this.rerouteService = rerouteService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction, clusterManagerMetrics);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction, clusterManagerMetrics);
}

@Override
Expand Down Expand Up @@ -251,9 +253,10 @@ class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T>
String type,
ShardId shardId,
String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action
Lister<? extends BaseNodesResponse<T>, T> action,
ClusterManagerMetrics clusterManagerMetrics
) {
super(logger, type, shardId, customDataPath, action);
super(logger, type, shardId, customDataPath, action, clusterManagerMetrics);
}

@Override
Expand All @@ -274,9 +277,11 @@ protected void reroute(String reroutingKey, String reason) {
class InternalPrimaryShardAllocator extends PrimaryShardAllocator {

private final TransportNodesListGatewayStartedShards startedAction;
private final ClusterManagerMetrics clusterManagerMetrics;

InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction) {
InternalPrimaryShardAllocator(TransportNodesListGatewayStartedShards startedAction, ClusterManagerMetrics clusterManagerMetrics) {
this.startedAction = startedAction;
this.clusterManagerMetrics = clusterManagerMetrics;
}

@Override
Expand All @@ -291,7 +296,8 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
"shard_started",
shardId,
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
startedAction
startedAction,
clusterManagerMetrics
)
);
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(
Expand All @@ -313,9 +319,11 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
class InternalReplicaShardAllocator extends ReplicaShardAllocator {

private final TransportNodesListShardStoreMetadata storeAction;
private final ClusterManagerMetrics clusterManagerMetrics;

InternalReplicaShardAllocator(TransportNodesListShardStoreMetadata storeAction) {
InternalReplicaShardAllocator(TransportNodesListShardStoreMetadata storeAction, ClusterManagerMetrics clusterManagerMetrics) {
this.storeAction = storeAction;
this.clusterManagerMetrics = clusterManagerMetrics;
}

@Override
Expand All @@ -330,7 +338,8 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
"shard_store",
shard.shardId(),
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
storeAction
storeAction,
clusterManagerMetrics
)
);
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> shardStores = fetch.fetchData(
Expand Down
Loading

0 comments on commit 8ddb3ee

Please sign in to comment.