Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a counter to node stat api to track shard going from idle to non-idle #12768

Merged
merged 6 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625))
- [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709))
- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583))
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))

### Dependencies
- Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public static class Stats implements Writeable, ToXContentFragment {
private long pitTimeInMillis;
private long pitCurrent;

private long searchIdleWakenUpCount;
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved

@Nullable
private RequestStatsLongHolder requestStatsLongHolder;

Expand Down Expand Up @@ -193,7 +195,8 @@ public Stats(
long pitCurrent,
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent
long suggestCurrent,
long searchIdleWakenUpCount
) {
this.requestStatsLongHolder = new RequestStatsLongHolder();
this.queryCount = queryCount;
Expand All @@ -220,6 +223,8 @@ public Stats(
this.pitCount = pitCount;
this.pitTimeInMillis = pitTimeInMillis;
this.pitCurrent = pitCurrent;

this.searchIdleWakenUpCount = searchIdleWakenUpCount;
}

private Stats(StreamInput in) throws IOException {
Expand Down Expand Up @@ -255,6 +260,10 @@ private Stats(StreamInput in) throws IOException {
concurrentQueryCurrent = in.readVLong();
queryConcurrency = in.readVLong();
}

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
searchIdleWakenUpCount = in.readVLong();
}
}

public void add(Stats stats) {
Expand Down Expand Up @@ -282,6 +291,8 @@ public void add(Stats stats) {
pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;

searchIdleWakenUpCount += stats.searchIdleWakenUpCount;
}

public void addForClosingShard(Stats stats) {
Expand All @@ -306,6 +317,8 @@ public void addForClosingShard(Stats stats) {
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
queryConcurrency += stats.queryConcurrency;

searchIdleWakenUpCount += stats.searchIdleWakenUpCount;
}

public long getQueryCount() {
Expand Down Expand Up @@ -412,6 +425,10 @@ public long getSuggestCurrent() {
return suggestCurrent;
}

public long getSearchIdleWakenUpCount() {
return searchIdleWakenUpCount;
}

public static Stats readStats(StreamInput in) throws IOException {
return new Stats(in);
}
Expand Down Expand Up @@ -457,6 +474,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(concurrentQueryCurrent);
out.writeVLong(queryConcurrency);
}

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(searchIdleWakenUpCount);
}
}

@Override
Expand Down Expand Up @@ -486,6 +507,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);

builder.field(Fields.SEARCH_IDLE_WAKEN_UP_TOTAL, searchIdleWakenUpCount);

if (requestStatsLongHolder != null) {
builder.startObject(Fields.REQUEST);

Expand Down Expand Up @@ -654,6 +677,7 @@ static final class Fields {
static final String TIME = "time";
static final String CURRENT = "current";
static final String TOTAL = "total";
static final String SEARCH_IDLE_WAKEN_UP_TOTAL = "search_idle_waken_up_total";
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public void onFreePitContext(ReaderContext readerContext) {
totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

@Override
public void onNewSearchIdleWakenUp() {
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved
totalStats.searchIdleMetric.inc();
}

/**
* Holder of statistics values
*
Expand All @@ -239,6 +244,7 @@ static final class StatsHolder {
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric pitCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();
final CounterMetric searchIdleMetric = new CounterMetric();

SearchStats.Stats stats() {
return new SearchStats.Stats(
Expand All @@ -260,7 +266,8 @@ SearchStats.Stats stats() {
pitCurrent.count(),
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count()
suggestCurrent.count(),
searchIdleMetric.count()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,10 @@ public Engine.Searcher acquireSearcher(String source) {
}

private void markSearcherAccessed() {
if (isSearchIdle()) {
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved
SearchOperationListener searchOperationListener = getSearchOperationListener();
searchOperationListener.onNewSearchIdleWakenUp();
}
lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@
*/
default void onFreePitContext(ReaderContext readerContext) {}

/**
* Executed when a shard goes from idle to non-idle state
*/
default void onNewSearchIdleWakenUp() {}

Check warning on line 151 in server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java#L151

Added line #L151 was not covered by tests

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
Expand Down Expand Up @@ -310,5 +315,16 @@
}
}
}

@Override
public void onNewSearchIdleWakenUp() {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewSearchIdleWakenUp();
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewSearchIdleWakenUp listener [{}] failed", listener), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
// let's create two dummy search stats with groups
Map<String, Stats> groupStats1 = new HashMap<>();
Map<String, Stats> groupStats2 = new HashMap<>();
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);

// adding these two search stats and checking group stats are correct
searchStats1.add(searchStats2);
Expand Down Expand Up @@ -128,6 +128,7 @@ private static void assertStats(Stats stats, long equalTo) {
assertEquals(equalTo, stats.getSuggestCount());
assertEquals(equalTo, stats.getSuggestTimeInMillis());
assertEquals(equalTo, stats.getSuggestCurrent());
assertEquals(equalTo, stats.getSearchIdleWakenUpCount());
// avg_concurrency is not summed up across stats
assertEquals(1, stats.getConcurrentAvgSliceCount(), 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void testListenersAreExecuted() {
AtomicInteger newScrollContext = new AtomicInteger();
AtomicInteger freeScrollContext = new AtomicInteger();
AtomicInteger validateSearchContext = new AtomicInteger();
AtomicInteger searchIdleWakenUp = new AtomicInteger();
AtomicInteger timeInNanos = new AtomicInteger(randomIntBetween(0, 10));
SearchOperationListener listener = new SearchOperationListener() {
@Override
Expand Down Expand Up @@ -133,6 +134,11 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertNotNull(readerContext);
validateSearchContext.incrementAndGet();
}

@Override
public void onNewSearchIdleWakenUp() {
searchIdleWakenUp.incrementAndGet();
}
};

SearchOperationListener throwingListener = (SearchOperationListener) Proxy.newProxyInstance(
Expand Down Expand Up @@ -169,6 +175,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFetchPhase(ctx, timeInNanos.get());
Expand All @@ -182,6 +189,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onPreQueryPhase(ctx);
Expand All @@ -195,6 +203,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onPreFetchPhase(ctx);
Expand All @@ -208,6 +217,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedFetchPhase(ctx);
Expand All @@ -221,6 +231,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedQueryPhase(ctx);
Expand All @@ -234,6 +245,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onNewReaderContext(mock(ReaderContext.class));
Expand All @@ -247,6 +259,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onNewScrollContext(mock(ReaderContext.class));
Expand All @@ -260,6 +273,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFreeReaderContext(mock(ReaderContext.class));
Expand All @@ -273,6 +287,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFreeScrollContext(mock(ReaderContext.class));
Expand All @@ -286,6 +301,21 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(2, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onNewSearchIdleWakenUp();
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(2, freeScrollContext.get());
assertEquals(2, searchIdleWakenUp.get());
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved
assertEquals(0, validateSearchContext.get());

if (throwingListeners == 0) {
Expand All @@ -311,6 +341,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(2, freeScrollContext.get());
assertEquals(2, searchIdleWakenUp.get());
assertEquals(2, validateSearchContext.get());
}
}
Loading