Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use collector manager for search when necessary #45829

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ConjunctionDISI;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
Expand All @@ -54,6 +55,7 @@
import org.elasticsearch.search.profile.query.QueryTimingType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -138,8 +140,19 @@ private void checkCancelled() {
}
}

public void search(List<LeafReaderContext> leaves, Weight weight, CollectorManager manager) throws IOException {
final List<Collector> collectors = new ArrayList<>(leaves.size());
for (LeafReaderContext ctx : leaves) {
final Collector collector = manager.newCollector();
//TODO: setMinCompetitveScore between Collectors
searchLeaf(ctx, weight, collector);
collectors.add(collector);
}
manager.reduce(collectors);
}

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
public void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change the visibility, we can also make searchLeaf private since it is only used internally

Copy link
Contributor Author

@mayya-sharipova mayya-sharipova Aug 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change the visibility

Because there is already a method with the same signature in its superclass IndexSearcher that is protected, if I don't make this method protected or public, Java will complain with a message "attempting to assign weaker access privileges". I will make this method protected

also make searchLeaf private

I did. I also have to modify SearchCancellationTests::testCancellableCollector as it was not able to use searchLeaf anymore.

for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
}
Expand Down Expand Up @@ -228,6 +241,7 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
}
}


private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
if (liveDocs instanceof SparseFixedBitSet) {
return (BitSet) liveDocs;
Expand Down
179 changes: 121 additions & 58 deletions server/src/main/java/org/elasticsearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.Weight;
Expand Down Expand Up @@ -71,6 +74,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
Expand Down Expand Up @@ -230,15 +234,10 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
// modify sorts: add sort on _score as 1st sort, and move the sort on the original field as the 2nd sort
SortField[] oldSortFields = searchContext.sort().sort.getSort();
DocValueFormat[] oldFormats = searchContext.sort().formats;
SortField[] newSortFields = new SortField[oldSortFields.length + 2];
DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 2];
SortField[] newSortFields = new SortField[oldSortFields.length + 1];
DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 1];
newSortFields[0] = SortField.FIELD_SCORE;
newFormats[0] = DocValueFormat.RAW;
// Add a tiebreak on _doc in order to be able to search
// the leaves in any order. This is needed since we reorder
// the leaves based on the minimum value in each segment.
newSortFields[newSortFields.length-1] = SortField.FIELD_DOC;
newFormats[newSortFields.length-1] = DocValueFormat.RAW;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need to tiebreak by global doc id because we reorder the leaves in searchWithCollectorManager ?

System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
System.arraycopy(oldFormats, 0, newFormats, 1, oldFormats.length);
sortAndFormatsForRewrittenNumericSort = searchContext.sort(); // stash SortAndFormats to restore it later
Expand Down Expand Up @@ -286,61 +285,20 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
} else {
checkCancelled = null;
}

searcher.setCheckCancelled(checkCancelled);

final boolean doProfile = searchContext.getProfilers() != null;
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);

final Collector queryCollector;
if (doProfile) {
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
queryCollector = profileCollector;
boolean shouldRescore;
// if we are optimizing sort and there are no other collectors
if (sortAndFormatsForRewrittenNumericSort != null && collectors.size() == 0 && searchContext.getProfilers() == null) {
shouldRescore = searchWithCollectorManager(searchContext, searcher, query, leafSorter, timeoutSet);
} else {
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}

try {
Weight weight = searcher.createWeight(searcher.rewrite(query), queryCollector.scoreMode(), 1f);
// We search the leaves in a different order when the numeric sort optimization is
// activated. Collectors expect leaves in order when searching but this is fine in this
// case since we only have a TopFieldCollector and we force the tiebreak on _doc.
List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
leafSorter.accept(leaves);
for (LeafReaderContext ctx : leaves) {
searcher.searchLeaf(ctx, weight, queryCollector);
}
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";

if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
}
queryResult.searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER
&& queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}

final QuerySearchResult result = searchContext.queryResult();
for (QueryCollectorContext ctx : collectors) {
ctx.postProcess(result);
shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
}

// if we rewrote numeric long or date sort, restore fieldDocs based on the original sort
if (sortAndFormatsForRewrittenNumericSort != null) {
searchContext.sort(sortAndFormatsForRewrittenNumericSort); // restore SortAndFormats
restoreTopFieldDocs(result, sortAndFormatsForRewrittenNumericSort);
restoreTopFieldDocs(queryResult, sortAndFormatsForRewrittenNumericSort);
}

ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
Expand All @@ -351,14 +309,119 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
}
if (searchContext.getProfilers() != null) {
ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
result.profileResults(shardResults);
queryResult.profileResults(shardResults);
}
return topDocsFactory.shouldRescore();
return shouldRescore;
} catch (Exception e) {
throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
}
}

private static boolean searchWithCollector(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
LinkedList<QueryCollectorContext> collectors, boolean hasFilterCollector, boolean timeoutSet) throws IOException {
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);

final Collector queryCollector;
if ( searchContext.getProfilers() != null) {
InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
queryCollector = profileCollector;
} else {
queryCollector = QueryCollectorContext.createQueryCollector(collectors);
}
QuerySearchResult queryResult = searchContext.queryResult();
try {
Weight weight = searcher.createWeight(searcher.rewrite(query), queryCollector.scoreMode(), 1f);
searcher.search(searcher.getIndexReader().leaves(), weight, queryCollector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can replace with searcher.search(query, queryCollector); ?

} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
}
queryResult.searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
queryResult.terminatedEarly(false);
}
for (QueryCollectorContext ctx : collectors) {
ctx.postProcess(queryResult);
}
return topDocsFactory.shouldRescore();
}

// we use collectorManager during sort optimization
// for the sort optimization, we have already checked that there are no other collectors, no filters,
// no search after, no scroll, no collapse, no track scores
// this means we can use TopFieldCollector directly
private static boolean searchWithCollectorManager(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter, boolean timeoutSet) throws IOException {
final IndexReader reader = searchContext.searcher().getIndexReader();
final int numHits = Math.min(searchContext.from() + searchContext.size(), Math.max(1, reader.numDocs()));
final SortAndFormats sortAndFormats = searchContext.sort();

ScoreMode scoreMode = ScoreMode.TOP_SCORES;
int hitCount = 0;
if (searchContext.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_DISABLED) {
hitCount = shortcutTotalHitCount(reader, query);
if (searchContext.trackTotalHitsUpTo() == Integer.MAX_VALUE) {
scoreMode = ScoreMode.COMPLETE; //TODO: not sure if scoreMode should always be TOP_SCORES
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already check this here so I don't think it is needed. We shouldn't run with the optim if the score mode is COMPLETE.

}
}
final int totalHitsThreshold = hitCount == -1 ? searchContext.trackTotalHitsUpTo() : 1;
final TotalHits totalHits = hitCount == -1 ? null : new TotalHits(hitCount, TotalHits.Relation.EQUAL_TO);

CollectorManager<TopFieldCollector, Void> manager = new CollectorManager<>() {
@Override
public TopFieldCollector newCollector() throws IOException {
return TopFieldCollector.create(sortAndFormats.sort, numHits, null, totalHitsThreshold);
}
@Override
public Void reduce(Collection<TopFieldCollector> collectors) throws IOException {
TopFieldDocs[] topDocsArr = new TopFieldDocs[collectors.size()];
int i = 0;
for (TopFieldCollector collector : collectors) {
topDocsArr[i++] = collector.topDocs();
}
// we have to set setShardIndex to true, as Lucene can't have ScoreDocs without shardIndex set
TopFieldDocs mergedTopDocs = TopDocs.merge(sortAndFormats.sort, 0, numHits, topDocsArr, true);
// reset shard index for all topDocs; ES will set shard index later during reduce stage
for (ScoreDoc scoreDoc : mergedTopDocs.scoreDocs) {
scoreDoc.shardIndex = -1;
}
if (totalHits != null) { // we have already precalculated totalHits for the whole index
mergedTopDocs = new TopFieldDocs(totalHits, mergedTopDocs.scoreDocs, mergedTopDocs.fields);
}
searchContext.queryResult().topDocs(new TopDocsAndMaxScore(mergedTopDocs, Float.NaN), sortAndFormats.formats);
return null;
}
};

List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
leafSorter.accept(leaves);
try {
Weight weight = searcher.createWeight(searcher.rewrite(query), scoreMode, 1f);
searcher.search(leaves, weight, manager);
} catch (TimeExceededException e) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
}
searchContext.queryResult().searchTimedOut(true);
} finally {
searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
}
return false; // no rescoring when sorting by field
}

private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader reader,
Query query, boolean hasFilterCollector) throws IOException {
if (searchContext.searchAfter() != null) return null;
Expand Down Expand Up @@ -399,7 +462,7 @@ private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader
if (missingValuesAccordingToSort == false) return null;

int docCount = PointValues.getDocCount(reader, fieldName);
// is not worth to run optimization on small index
// is not worth to run optimization on small index
if (docCount <= 512) return null;

// check for multiple values
Expand Down Expand Up @@ -470,7 +533,7 @@ static void restoreTopFieldDocs(QuerySearchResult result, SortAndFormats origina
TopDocs topDocs = result.topDocs().topDocs;
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
FieldDoc fieldDoc = (FieldDoc) scoreDoc;
fieldDoc.fields = Arrays.copyOfRange(fieldDoc.fields, 1, fieldDoc.fields.length-1);
fieldDoc.fields = Arrays.copyOfRange(fieldDoc.fields, 1, fieldDoc.fields.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need to tiebreak by global doc id because we reorder the leaves in searchWithCollectorManager ?

}
TopFieldDocs newTopDocs = new TopFieldDocs(topDocs.totalHits, topDocs.scoreDocs, originalSortAndFormats.sort.getSort());
result.topDocs(new TopDocsAndMaxScore(newTopDocs, Float.NaN), originalSortAndFormats.formats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,9 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy()) {

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
throw new AssertionError();
public void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
final Collector in = new AssertingEarlyTerminationFilterCollector(collector, size);
super.search(leaves, weight, in);
}

@Override
Expand Down