Skip to content

Commit

Permalink
Add sort and collapse info to SearchHits transport serialization (ela…
Browse files Browse the repository at this point in the history
…stic#36555)

In order for CCS alternate execution mode (see elastic#32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of the `SearchHits`, specifically:

- lucene `SortField[]` which contains info about the fields that sorting was performed on and their type, which depends on mappings (that the CCS node does not know about)
- collapse field (`String`) that field collapsing was executed on, if requested
- collapse values (`Object[]`) that field collapsing was based on, if requested

This info is needed to be able to reconstruct the `TopFieldDocs` or `CollapseFieldTopDocs` in the CCS coordinating node to feed the `mergeTopDocs` method and reduce multiple search responses received (one per cluster) into one.

This commit adds such information to the `SearchHits` class. It's nullable info that is not serialized through the REST layer. `SearchPhaseController` sets such info at the end of the hits reduction phase.
  • Loading branch information
javanna committed Dec 18, 2018
1 parent e52941c commit 0156d94
Show file tree
Hide file tree
Showing 12 changed files with 561 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ private void innerRun() throws IOException {
// query AND fetch optimization
finishPhase.run();
} else {
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
if (reducedQueryPhase.scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs);
if (scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
phaseResults.stream()
.map(SearchPhaseResult::queryResult)
.forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,23 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
}
}
}
final boolean isSortedByField;
final SortField[] sortFields;
boolean isSortedByField = false;
SortField[] sortFields = null;
String collapseField = null;
Object[] collapseValues = null;
if (mergedTopDocs instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
sortFields = fieldDocs.fields;
} else {
isSortedByField = false;
sortFields = null;
if (fieldDocs instanceof CollapseTopFieldDocs) {
isSortedByField = (fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
CollapseTopFieldDocs collapseTopFieldDocs = (CollapseTopFieldDocs) fieldDocs;
collapseField = collapseTopFieldDocs.field;
collapseValues = collapseTopFieldDocs.collapseValues;
} else {
isSortedByField = true;
}
}
return new SortedTopDocs(scoreDocs, isSortedByField, sortFields);
return new SortedTopDocs(scoreDocs, isSortedByField, sortFields, collapseField, collapseValues);
} else {
// no relevant docs
return SortedTopDocs.EMPTY;
Expand Down Expand Up @@ -262,7 +267,7 @@ private static void setShardIndex(TopDocs topDocs, int shardIndex) {
public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, int numShards) {
final ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards];
if (reducedQueryPhase.isEmptyResult == false) {
final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.scoreDocs;
final ScoreDoc[] sortedScoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
// from is always zero as when we use scroll, we ignore from
long size = Math.min(reducedQueryPhase.fetchHits, reducedQueryPhase.size);
// with collapsing we can have more hits than sorted docs
Expand Down Expand Up @@ -303,7 +308,7 @@ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reduce
if (reducedQueryPhase.isEmptyResult) {
return InternalSearchResponse.empty();
}
ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
ScoreDoc[] sortedDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
SearchHits hits = getHits(reducedQueryPhase, ignoreFrom, fetchResults, resultsLookup);
if (reducedQueryPhase.suggest != null) {
if (!fetchResults.isEmpty()) {
Expand Down Expand Up @@ -341,12 +346,12 @@ public InternalSearchResponse merge(boolean ignoreFrom, ReducedQueryPhase reduce

private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFrom,
Collection<? extends SearchPhaseResult> fetchResults, IntFunction<SearchPhaseResult> resultsLookup) {
final boolean sorted = reducedQueryPhase.isSortedByField;
ScoreDoc[] sortedDocs = reducedQueryPhase.scoreDocs;
SortedTopDocs sortedTopDocs = reducedQueryPhase.sortedTopDocs;
int sortScoreIndex = -1;
if (sorted) {
for (int i = 0; i < reducedQueryPhase.sortField.length; i++) {
if (reducedQueryPhase.sortField[i].getType() == SortField.Type.SCORE) {
if (sortedTopDocs.isSortedByField) {
SortField[] sortFields = sortedTopDocs.sortFields;
for (int i = 0; i < sortFields.length; i++) {
if (sortFields[i].getType() == SortField.Type.SCORE) {
sortScoreIndex = i;
}
}
Expand All @@ -358,12 +363,12 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
int from = ignoreFrom ? 0 : reducedQueryPhase.from;
int numSearchHits = (int) Math.min(reducedQueryPhase.fetchHits - from, reducedQueryPhase.size);
// with collapsing we can have more fetch hits than sorted docs
numSearchHits = Math.min(sortedDocs.length, numSearchHits);
numSearchHits = Math.min(sortedTopDocs.scoreDocs.length, numSearchHits);
// merge hits
List<SearchHit> hits = new ArrayList<>();
if (!fetchResults.isEmpty()) {
for (int i = 0; i < numSearchHits; i++) {
ScoreDoc shardDoc = sortedDocs[i];
ScoreDoc shardDoc = sortedTopDocs.scoreDocs[i];
SearchPhaseResult fetchResultProvider = resultsLookup.apply(shardDoc.shardIndex);
if (fetchResultProvider == null) {
// this can happen if we are hitting a shard failure during the fetch phase
Expand All @@ -379,7 +384,7 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
SearchHit searchHit = fetchResult.hits().getHits()[index];
searchHit.score(shardDoc.score);
searchHit.shard(fetchResult.getSearchShardTarget());
if (sorted) {
if (sortedTopDocs.isSortedByField) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
if (sortScoreIndex != -1) {
Expand All @@ -389,7 +394,8 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
hits.add(searchHit);
}
}
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits,
reducedQueryPhase.maxScore, sortedTopDocs.sortFields, sortedTopDocs.collapseField, sortedTopDocs.collapseValues);
}

/**
Expand Down Expand Up @@ -429,7 +435,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
Boolean terminatedEarly = null;
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, null, null, null, EMPTY_DOCS, null, null, numReducePhases, false, 0, 0, true);
timedOut, terminatedEarly, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
}
final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult();
final boolean hasSuggest = firstResult.suggest() != null;
Expand Down Expand Up @@ -491,10 +497,10 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs scoreDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
firstResult.sortValueFormats(), numReducePhases, scoreDocs.isSortedByField, size, from, false);
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, firstResult == null);
}

/**
Expand Down Expand Up @@ -542,12 +548,8 @@ public static final class ReducedQueryPhase {
final SearchProfileShardResults shardResults;
// the number of reduces phases
final int numReducePhases;
// the searches merged top docs
final ScoreDoc[] scoreDocs;
// the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
final SortField[] sortField;
// <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
final boolean isSortedByField;
//encloses info about the merged top docs, the sort fields used to sort the score docs etc.
final SortedTopDocs sortedTopDocs;
// the size of the top hits to return
final int size;
// <code>true</code> iff the query phase had no results. Otherwise <code>false</code>
Expand All @@ -558,9 +560,8 @@ public static final class ReducedQueryPhase {
final DocValueFormat[] sortValueFormats;

ReducedQueryPhase(long totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest,
InternalAggregations aggregations, SearchProfileShardResults shardResults, ScoreDoc[] scoreDocs,
SortField[] sortFields, DocValueFormat[] sortValueFormats, int numReducePhases, boolean isSortedByField, int size,
int from, boolean isEmptyResult) {
InternalAggregations aggregations, SearchProfileShardResults shardResults, SortedTopDocs sortedTopDocs,
DocValueFormat[] sortValueFormats, int numReducePhases, int size, int from, boolean isEmptyResult) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
Expand All @@ -577,9 +578,7 @@ public static final class ReducedQueryPhase {
this.aggregations = aggregations;
this.shardResults = shardResults;
this.numReducePhases = numReducePhases;
this.scoreDocs = scoreDocs;
this.sortField = sortFields;
this.isSortedByField = isSortedByField;
this.sortedTopDocs = sortedTopDocs;
this.size = size;
this.from = from;
this.isEmptyResult = isEmptyResult;
Expand Down Expand Up @@ -719,7 +718,7 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
}
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
public ReducedQueryPhase reduce() {
ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
}
};
Expand Down Expand Up @@ -752,15 +751,23 @@ void add(TopDocs topDocs) {
}

static final class SortedTopDocs {
static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null);
static final SortedTopDocs EMPTY = new SortedTopDocs(EMPTY_DOCS, false, null, null, null);
// the searches merged top docs
final ScoreDoc[] scoreDocs;
// <code>true</code> iff the result score docs is sorted by a field (not score), this implies that <code>sortField</code> is set.
final boolean isSortedByField;
// the top docs sort fields used to sort the score docs, <code>null</code> if the results are not sorted
final SortField[] sortFields;
final String collapseField;
final Object[] collapseValues;

SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields) {
SortedTopDocs(ScoreDoc[] scoreDocs, boolean isSortedByField, SortField[] sortFields,
String collapseField, Object[] collapseValues) {
this.scoreDocs = scoreDocs;
this.isSortedByField = isSortedByField;
this.sortFields = sortFields;
this.collapseField = collapseField;
this.collapseValues = collapseValues;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.transport.Transport;

import java.io.IOException;
import java.util.function.BiFunction;

final class SearchScrollQueryThenFetchAsyncAction extends SearchScrollAsyncAction<ScrollQuerySearchResult> {
Expand Down Expand Up @@ -68,16 +67,16 @@ protected void executeInitialPhase(Transport.Connection connection, InternalScro
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
return new SearchPhase("fetch") {
@Override
public void run() throws IOException {
public void run() {
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
queryResults.asList());
if (reducedQueryPhase.scoreDocs.length == 0) {
ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
if (scoreDocs.length == 0) {
sendResponse(reducedQueryPhase, fetchResults);
return;
}

final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(),
reducedQueryPhase.scoreDocs);
final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(queryResults.length(), scoreDocs);
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase,
queryResults.length());
final CountDown counter = new CountDown(docIdsToLoad.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,22 +785,36 @@ public <T> void writeArray(final Writer<T> writer, final T[] array) throws IOExc
}
}

public <T extends Writeable> void writeArray(T[] array) throws IOException {
writeVInt(array.length);
for (T value: array) {
value.writeTo(this);
}
}

public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws IOException {
/**
* Same as {@link #writeArray(Writer, Object[])} but the provided array may be null. An additional boolean value is
* serialized to indicate whether the array was null or not.
*/
public <T> void writeOptionalArray(final Writer<T> writer, final @Nullable T[] array) throws IOException {
if (array == null) {
writeBoolean(false);
} else {
writeBoolean(true);
writeArray(array);
writeArray(writer, array);
}
}

/**
* Writes the specified array of {@link Writeable}s. This method can be seen as
* writer version of {@link StreamInput#readArray(Writeable.Reader, IntFunction)}. The length of array encoded as a variable-length
* integer is first written to the stream, and then the elements of the array are written to the stream.
*/
public <T extends Writeable> void writeArray(T[] array) throws IOException {
writeArray((out, value) -> value.writeTo(out), array);
}

/**
* Same as {@link #writeArray(Writeable[])} but the provided array may be null. An additional boolean value is
* serialized to indicate whether the array was null or not.
*/
public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws IOException {
writeOptionalArray((out, value) -> value.writeTo(out), array);
}

/**
* Serializes a potential null value.
*/
Expand Down
Loading

0 comments on commit 0156d94

Please sign in to comment.