Skip to content

Commit

Permalink
Fix false positive query timeouts due to using cached time
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmad AbuKhalil <abukhali@amazon.com>
  • Loading branch information
aabukhalil committed May 25, 2022
1 parent 5320b68 commit f604902
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 9 deletions.
37 changes: 28 additions & 9 deletions server/src/main/java/org/opensearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.lucene.queries.MinDocQuery;
import org.opensearch.lucene.queries.SearchAfterSortedDocQuery;
import org.apache.lucene.search.BooleanClause;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.LongSupplier;

import static org.opensearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
import static org.opensearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
Expand Down Expand Up @@ -257,15 +259,9 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q

final Runnable timeoutRunnable;
if (timeoutSet) {
final long startTime = searchContext.getRelativeTimeInMillis();
final long timeout = searchContext.timeout().millis();
final long maxTime = startTime + timeout;
timeoutRunnable = searcher.addQueryCancellation(() -> {
final long time = searchContext.getRelativeTimeInMillis();
if (time > maxTime) {
throw new TimeExceededException();
}
});
timeoutRunnable = searcher.addQueryCancellation(
createQueryTimeoutChecker(searchContext::getRelativeTimeInMillis, searchContext.timeout().millis())
);
} else {
timeoutRunnable = null;
}
Expand Down Expand Up @@ -309,6 +305,29 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
}
}

/**
* Create runnable which throws {@link TimeExceededException} when the runnable is called after timeout + runnable creation time
* exceeds currentTime
* @param relativeMsTimeSupplier supplier of currentTime in milliseconds which is okay to be cached
* @param timeout the max time duration in milliseconds before throwing {@link TimeExceededException}
* @return the created runnable
*/
static Runnable createQueryTimeoutChecker(LongSupplier relativeMsTimeSupplier, final long timeout) {
/* for startTime, relative non-cached time must be used to prevent false positive timeouts.
* Using cached time for startTime will fail and produce false positive timeouts when maxTime = (startTime + timeout) falls in
* next time cache slot(s) AND time caching lifespan > passed timeout */
final long startTime = TimeValue.nsecToMSec(System.nanoTime());
final long maxTime = startTime + timeout;
return () -> {
/* As long as startTime is non cached time, using cached time here might only produce false negative timeouts within the time
* cache life span which is acceptable */
final long time = relativeMsTimeSupplier.getAsLong();
if (time > maxTime) {
throw new TimeExceededException();
}
};
}

private static boolean searchWithCollector(
SearchContext searchContext,
ContextIndexSearcher searcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
Expand All @@ -105,6 +106,7 @@
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.test.TestSearchContext;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -1079,6 +1081,50 @@ public void testCancellationDuringPreprocess() throws IOException {
}
}

public void testQueryTimeoutChecker() throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;

// should throw time exceed exception for sure after timeCacheLifespan*2+timeTolerance (next's next cached time is available)
assertThrows(
QueryPhase.TimeExceededException.class,
() -> createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan * 2 + timeTolerance, true, false)
);

// should not throw time exceed exception after timeCacheLifespan+timeTolerance because new cached time - init time < timeout
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan, timeCacheLifespan + timeTolerance, true, false);

// should not throw time exceed exception after timeout < timeCacheLifespan when cached time didn't change
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 2, timeCacheLifespan / 2 + timeTolerance, false, true);
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true);
}

private void createTimeoutCheckerThenWaitThenRun(
long timeout,
long sleepAfterCreation,
boolean checkCachedTimeChanged,
boolean checkCachedTimeHasNotChanged
) throws Exception {
long timeCacheLifespan = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
long timeTolerance = timeCacheLifespan / 20;
long currentTimeDiffWithCachedTime = TimeValue.nsecToMSec(System.nanoTime()) - threadPool.relativeTimeInMillis();
// need to run this test approximately at the start of cached time window
long timeToAlignTimeWithCachedTimeOffset = timeCacheLifespan - currentTimeDiffWithCachedTime + timeTolerance;
Thread.sleep(timeToAlignTimeWithCachedTimeOffset);

long initialRelativeCachedTime = threadPool.relativeTimeInMillis();
Runnable queryTimeoutChecker = QueryPhase.createQueryTimeoutChecker(() -> threadPool.relativeTimeInMillis(), timeout);
// make sure next time slot become available
Thread.sleep(sleepAfterCreation);
if (checkCachedTimeChanged) {
assertNotEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
if (checkCachedTimeHasNotChanged) {
assertEquals(initialRelativeCachedTime, threadPool.relativeTimeInMillis());
}
queryTimeoutChecker.run();
}

private static class TestSearchContextWithRewriteAndCancellation extends TestSearchContext {

private TestSearchContextWithRewriteAndCancellation(
Expand Down

0 comments on commit f604902

Please sign in to comment.