Skip to content

Commit

Permalink
add metrics to tack idle shard waking up
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <mariazrr@amazon.com>
  • Loading branch information
ruai0511 committed Mar 19, 2024
1 parent d4e1ab1 commit 3a13164
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ public static class Stats implements Writeable, ToXContentFragment {
private long pitTimeInMillis;
private long pitCurrent;

private long searchIdleWakenUpCount;

@Nullable
private RequestStatsLongHolder requestStatsLongHolder;

Expand Down Expand Up @@ -193,7 +195,8 @@ public Stats(
long pitCurrent,
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent
long suggestCurrent,
long searchIdleWakenUpCount
) {
this.requestStatsLongHolder = new RequestStatsLongHolder();
this.queryCount = queryCount;
Expand All @@ -220,6 +223,8 @@ public Stats(
this.pitCount = pitCount;
this.pitTimeInMillis = pitTimeInMillis;
this.pitCurrent = pitCurrent;

this.searchIdleWakenUpCount = searchIdleWakenUpCount;
}

private Stats(StreamInput in) throws IOException {
Expand Down Expand Up @@ -255,6 +260,10 @@ private Stats(StreamInput in) throws IOException {
concurrentQueryCurrent = in.readVLong();
queryConcurrency = in.readVLong();
}

if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
searchIdleWakenUpCount = in.readVLong();
}
}

public void add(Stats stats) {
Expand Down Expand Up @@ -282,6 +291,8 @@ public void add(Stats stats) {
pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;

searchIdleWakenUpCount += stats.searchIdleWakenUpCount;
}

public void addForClosingShard(Stats stats) {
Expand All @@ -306,6 +317,8 @@ public void addForClosingShard(Stats stats) {
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
queryConcurrency += stats.queryConcurrency;

searchIdleWakenUpCount += stats.searchIdleWakenUpCount;
}

public long getQueryCount() {
Expand Down Expand Up @@ -412,6 +425,10 @@ public long getSuggestCurrent() {
return suggestCurrent;
}

public long getSearchIdleWakenUpCount() {
return searchIdleWakenUpCount;
}

public static Stats readStats(StreamInput in) throws IOException {
return new Stats(in);
}
Expand Down Expand Up @@ -457,6 +474,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(concurrentQueryCurrent);
out.writeVLong(queryConcurrency);
}

if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(searchIdleWakenUpCount);
}
}

@Override
Expand Down Expand Up @@ -486,6 +507,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);

builder.field(Fields.SEARCH_IDLE_WAKEN_UP_TOTAL, searchIdleWakenUpCount);

if (requestStatsLongHolder != null) {
builder.startObject(Fields.REQUEST);

Expand Down Expand Up @@ -654,6 +677,7 @@ static final class Fields {
static final String TIME = "time";
static final String CURRENT = "current";
static final String TOTAL = "total";
static final String SEARCH_IDLE_WAKEN_UP_TOTAL = "search_idle_waken_up_total";

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public void onFreePitContext(ReaderContext readerContext) {
totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

@Override
public void onNewSearchIdleWakenUp() {
totalStats.searchIdleMetric.inc();
}

/**
* Holder of statistics values
*
Expand All @@ -239,6 +244,7 @@ static final class StatsHolder {
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric pitCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();
final CounterMetric searchIdleMetric = new CounterMetric();

SearchStats.Stats stats() {
return new SearchStats.Stats(
Expand All @@ -260,7 +266,8 @@ SearchStats.Stats stats() {
pitCurrent.count(),
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count()
suggestCurrent.count(),
searchIdleMetric.count()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,10 @@ public Engine.Searcher acquireSearcher(String source) {
}

private void markSearcherAccessed() {
if (isSearchIdle()) {
SearchOperationListener searchOperationListener = getSearchOperationListener();
searchOperationListener.onNewSearchIdleWakenUp();
}
lastSearcherAccess.lazySet(threadPool.relativeTimeInMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ default void onNewPitContext(ReaderContext readerContext) {}
*/
default void onFreePitContext(ReaderContext readerContext) {}

/**
* Executed when a shard goes from idle to non-idle state
*/
default void onNewSearchIdleWakenUp() {}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
Expand Down Expand Up @@ -310,5 +315,16 @@ public void onFreePitContext(ReaderContext readerContext) {
}
}
}

@Override
public void onNewSearchIdleWakenUp() {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewSearchIdleWakenUp();
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewSearchIdleWakenUp listener [{}] failed", listener), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
// let's create two dummy search stats with groups
Map<String, Stats> groupStats1 = new HashMap<>();
Map<String, Stats> groupStats2 = new HashMap<>();
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);

// adding these two search stats and checking group stats are correct
searchStats1.add(searchStats2);
Expand Down Expand Up @@ -128,6 +128,7 @@ private static void assertStats(Stats stats, long equalTo) {
assertEquals(equalTo, stats.getSuggestCount());
assertEquals(equalTo, stats.getSuggestTimeInMillis());
assertEquals(equalTo, stats.getSuggestCurrent());
assertEquals(equalTo, stats.getSearchIdleWakenUpCount());
// avg_concurrency is not summed up across stats
assertEquals(1, stats.getConcurrentAvgSliceCount(), 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void testListenersAreExecuted() {
AtomicInteger newScrollContext = new AtomicInteger();
AtomicInteger freeScrollContext = new AtomicInteger();
AtomicInteger validateSearchContext = new AtomicInteger();
AtomicInteger searchIdleWakenUp = new AtomicInteger();
AtomicInteger timeInNanos = new AtomicInteger(randomIntBetween(0, 10));
SearchOperationListener listener = new SearchOperationListener() {
@Override
Expand Down Expand Up @@ -133,6 +134,11 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertNotNull(readerContext);
validateSearchContext.incrementAndGet();
}

@Override
public void onNewSearchIdleWakenUp() {
searchIdleWakenUp.incrementAndGet();
}
};

SearchOperationListener throwingListener = (SearchOperationListener) Proxy.newProxyInstance(
Expand Down Expand Up @@ -169,6 +175,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFetchPhase(ctx, timeInNanos.get());
Expand All @@ -182,6 +189,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onPreQueryPhase(ctx);
Expand All @@ -195,6 +203,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onPreFetchPhase(ctx);
Expand All @@ -208,6 +217,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedFetchPhase(ctx);
Expand All @@ -221,6 +231,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFailedQueryPhase(ctx);
Expand All @@ -234,6 +245,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onNewReaderContext(mock(ReaderContext.class));
Expand All @@ -247,6 +259,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onNewScrollContext(mock(ReaderContext.class));
Expand All @@ -260,6 +273,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFreeReaderContext(mock(ReaderContext.class));
Expand All @@ -273,6 +287,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(0, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onFreeScrollContext(mock(ReaderContext.class));
Expand All @@ -286,6 +301,21 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(2, freeScrollContext.get());
assertEquals(0, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

compositeListener.onNewSearchIdleWakenUp();
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(2, freeScrollContext.get());
assertEquals(2, searchIdleWakenUp.get());
assertEquals(0, validateSearchContext.get());

if (throwingListeners == 0) {
Expand All @@ -311,6 +341,7 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(2, freeScrollContext.get());
assertEquals(2, searchIdleWakenUp.get());
assertEquals(2, validateSearchContext.get());
}
}

0 comments on commit 3a13164

Please sign in to comment.