Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>
  • Loading branch information
sohami committed Jul 12, 2023
1 parent 4e9d0ba commit b623bc4
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))
- Move span actions to Scope ([#8411](https://github.com/opensearch-project/OpenSearch/pull/8411))
- Add wrapper tracer implementation ([#8565](https://github.com/opensearch-project/OpenSearch/pull/8565))
- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.index.search.NestedHelper;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -176,7 +177,7 @@ final class DefaultSearchContext extends SearchContext {
private SuggestionSearchContext suggest;
private List<RescoreContext> rescore;
private Profilers profilers;

private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR;
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
private final Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers = new HashMap<>();
private final QueryShardContext queryShardContext;
Expand Down Expand Up @@ -919,4 +920,14 @@ public ReaderContext readerContext() {
public InternalAggregation.ReduceContext partial() {
return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction();
}

@Override
public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) {
this.bucketCollectorProcessor = bucketCollectorProcessor;
}

@Override
public BucketCollectorProcessor bucketCollectorProcessor() {
return bucketCollectorProcessor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;

/**
Expand Down Expand Up @@ -53,26 +50,7 @@ public Collector newCollector() throws IOException {

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
List<Aggregator> aggregators = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof Aggregator) {
aggregators.add((Aggregator) currentCollector);
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) {
aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector());
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}

final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.MultiCollector;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

/**
* Processor to perform collector level processing specific to {@link BucketCollector} in different stages like: a) PostCollection
* after search on each leaf is completed and b) process the collectors to perform reduce after collection is completed
*/
public class BucketCollectorProcessor {

/**
* Performs {@link BucketCollector#postCollection()} on all the {@link BucketCollector} in the given {@link Collector} collector tree
* after the collection of documents on a leaf is completed. This method will be called by different slice threads on its own collector
* tree instance in case of concurrent segment search such that postCollection happens on the same slice thread which initialize and
* perform collection of the documents for a leaf segment. For sequential search case, there is always a single search thread which
* performs both collection and postCollection on {@link BucketCollector}.
* <p>
* This was originally done in {@link org.opensearch.search.aggregations.AggregationProcessor#postProcess(SearchContext)}. But with
* concurrent segment search path this needs to be performed here. There are AssertingCodecs in lucene which validates that the
* DocValues created for a field is always used by the same thread for a request. In concurrent segment search case, the DocValues
* gets initialized on different threads for different segments (or slices). Whereas the postProcess happens as part of reduce phase
* and is performed on the separate thread which is from search threadpool and not from slice threadpool. So two different threads
* performs the access on the DocValues causing the AssertingCodec to fail. From functionality perspective, there is no issue as
* DocValues for each segment is always accessed by a single thread at a time but those threads may be different (e.g. slice thread
* during collection and then search thread during reduce)
* </p>
* <p>
* NOTE: We can evaluate and deprecate this postCollection processing once lucene release the changes described in the
* <a href="https://github.com/apache/lucene/issues/12375">issue-12375</a>. With this new change we should be able to implement
* {@link BucketCollector#postCollection()} functionality using the lucene interface directly such that postCollection gets called
* from the slice thread by lucene itself
* </p>
* @param collectorTree collector tree used by calling thread
*/
public void processPostCollection(Collector collectorTree) throws IOException {
final Queue<Collector> collectors = new LinkedList<>();
collectors.offer(collectorTree);
while (!collectors.isEmpty()) {
Collector currentCollector = collectors.poll();
if (currentCollector instanceof InternalProfileCollector) {
collectors.offer(((InternalProfileCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MinimumScoreCollector) {
collectors.offer(((MinimumScoreCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MultiCollector) {
for (Collector innerCollector : ((MultiCollector) currentCollector).getCollectors()) {
collectors.offer(innerCollector);
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();
}
}
}

/**
* Unwraps the input collection of {@link Collector} to get the list of the {@link Aggregator} used by different slice threads. The
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
* during the reduce phase. This list of {@link Aggregator} is used to create {@link InternalAggregation} and optionally perform
* reduce at shard level before returning response to coordinator
* @param collectors collection of aggregation collectors to reduce
* @return list of unwrapped {@link Aggregator}
*/
public List<Aggregator> toAggregators(Collection<Collector> collectors) {
List<Aggregator> aggregators = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof Aggregator) {
aggregators.add((Aggregator) currentCollector);
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) {
aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector());
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}
return aggregators;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
* avoid the increase in aggregation result sets returned by each shard to coordinator where final reduce happens for results received from
* all the shards
*/
public class ConcurrentAggregationProcessor extends DefaultAggregationProcessor {
public class ConcurrentAggregationProcessor implements AggregationProcessor {

private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor();

@Override
public void preProcess(SearchContext context) {
try {
if (context.aggregations() != null) {
// update the bucket collector process as there is aggregation in the request
context.setBucketCollectorProcessor(bucketCollectorProcessor);
if (context.aggregations().factories().hasNonGlobalAggregator()) {
context.queryCollectorManagers().put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManager(context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
*/
public class DefaultAggregationProcessor implements AggregationProcessor {

private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor();

@Override
public void preProcess(SearchContext context) {
try {
if (context.aggregations() != null) {
// update the bucket collector process as there is aggregation in the request
context.setBucketCollectorProcessor(bucketCollectorProcessor);
if (context.aggregations().factories().hasNonGlobalAggregator()) {
context.queryCollectorManagers()
.put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManagerWithSingleCollector(context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
Expand All @@ -64,16 +63,13 @@
import org.apache.lucene.util.CombinedBitSet;
import org.apache.lucene.util.SparseFixedBitSet;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.lease.Releasable;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.BucketCollector;
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
Expand All @@ -86,10 +82,8 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;

Expand Down Expand Up @@ -282,38 +276,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
searchLeaf(leaves.get(i), weight, collector);
}
}
postCollection(collector);
}

/**
* Performs the postCollection on the {@link BucketCollector} which is used to collect documents for Aggregation. This was originally
* done in {@link org.opensearch.search.aggregations.AggregationProcessor#postProcess(SearchContext)}. But with concurrent segment
* search path this needs to be performed here. There are AssertingCodecs in lucene which validates that the DocValues created for a
* field is always used by the same thread for a request. In concurrent segment search case, the DocValues gets initialized on
* different threads for different segments (or slices). Whereas the postProcess happens as part of reduce phase and is performed on
* the separate thread which is from search threadpool and not from slice threadpool. So two different threads performs the access on
* the DocValues causing the AssertingCodec to fail. From functionality perspective, there is no issue as DocValues for each segment
* is always accessed by a single thread at a time but those threads may be different (e.g. slice thread during collection and then
* search thread during reduce)
* @param collector input collector tree used for search
*/
private void postCollection(Collector collector) throws IOException {
final Queue<Collector> collectors = new LinkedList<>();
collectors.offer(collector);
while (!collectors.isEmpty()) {
Collector currentCollector = collectors.poll();
if (currentCollector instanceof InternalProfileCollector) {
collectors.offer(((InternalProfileCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MinimumScoreCollector) {
collectors.offer(((MinimumScoreCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MultiCollector) {
for (Collector innerCollector : ((MultiCollector) currentCollector).getCollectors()) {
collectors.offer(innerCollector);
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();
}
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.collapse.CollapseContext;
Expand Down Expand Up @@ -548,4 +549,14 @@ public ReaderContext readerContext() {
public InternalAggregation.ReduceContext partial() {
return in.partial();
}

@Override
public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) {
in.setBucketCollectorProcessor(bucketCollectorProcessor);
}

@Override
public BucketCollectorProcessor bucketCollectorProcessor() {
return in.bucketCollectorProcessor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.opensearch.search.RescoreDocIds;
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.collapse.CollapseContext;
Expand All @@ -73,6 +75,7 @@
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.suggest.SuggestionSearchContext;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -94,6 +97,20 @@ public abstract class SearchContext implements Releasable {
public static final int TRACK_TOTAL_HITS_DISABLED = -1;
public static final int DEFAULT_TRACK_TOTAL_HITS_UP_TO = 10000;

// no-op bucket collector processor
public static final BucketCollectorProcessor NO_OP_BUCKET_COLLECTOR_PROCESSOR = new BucketCollectorProcessor() {
@Override
public void processPostCollection(Collector collectorTree) {
// do nothing as there is no aggregation collector
}

@Override
public List<Aggregator> toAggregators(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}
};

private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private InnerHitsContext innerHitsContext;
Expand Down Expand Up @@ -449,4 +466,9 @@ public String toString() {
public abstract ReaderContext readerContext();

public abstract InternalAggregation.ReduceContext partial();

// processor used for bucket collectors
public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor);

public abstract BucketCollectorProcessor bucketCollectorProcessor();
}
Loading

0 comments on commit b623bc4

Please sign in to comment.