Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Aggregator#buildTopLevel() to search worker thread. #98715

Merged
merged 20 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher;
Expand All @@ -31,36 +30,54 @@ public static void preProcess(SearchContext context) {
if (context.aggregations() == null) {
return;
}
final Supplier<Collector> collectorSupplier;
final Supplier<AggregatorCollector> collectorSupplier;
if (context.aggregations().isInSortOrderExecutionRequired()) {
executeInSortOrder(context, newBucketCollector(context));
collectorSupplier = () -> BucketCollector.NO_OP_COLLECTOR;
AggregatorCollector collector = newAggregatorCollector(context);
executeInSortOrder(context, collector.bucketCollector);
collectorSupplier = () -> new AggregatorCollector(collector.aggregators, BucketCollector.NO_OP_BUCKET_COLLECTOR);
} else {
collectorSupplier = () -> newBucketCollector(context).asCollector();
collectorSupplier = () -> newAggregatorCollector(context);
}
context.aggregations().registerAggsCollectorManager(new CollectorManager<>() {
@Override
public Collector newCollector() {
public AggregatorCollector newCollector() {
return collectorSupplier.get();
}

@Override
public Void reduce(Collection<Collector> collectors) {
// we cannot run post-collection method here because we need to do it after the optional timeout
// has been removed from the index searcher. Therefore, we delay this processing to the
// AggregationPhase#execute method.
public Void reduce(Collection<AggregatorCollector> collectors) {
if (context.queryResult().hasAggs()) {
// no need to compute the aggs twice, they should be computed on a per context basis
return null;
}
if (collectors.size() > 1) {
// we execute this search using more than one slice. In order to keep memory requirements
// low, we do a partial reduction here.
final List<InternalAggregations> internalAggregations = new ArrayList<>(collectors.size());
collectors.forEach(c -> internalAggregations.add(InternalAggregations.from(c.internalAggregations)));
context.queryResult()
.aggregations(
InternalAggregations.topLevelReduce(
internalAggregations,
context.aggregations().getAggregationReduceContextBuilder().forPartialReduction()
)
);
} else if (collectors.size() == 1) {
context.queryResult().aggregations(InternalAggregations.from(collectors.iterator().next().internalAggregations));
}
// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
return null;
}
});
}

private static BucketCollector newBucketCollector(SearchContext context) {
private static AggregatorCollector newAggregatorCollector(SearchContext context) {
try {
Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators();
context.aggregations().aggregators(aggregators);
BucketCollector bucketCollector = MultiBucketCollector.wrap(true, List.of(aggregators));
bucketCollector.preCollection();
return bucketCollector;
return new AggregatorCollector(aggregators, bucketCollector);
} catch (IOException e) {
throw new AggregationInitializationException("Could not initialize aggregators", e);
}
Expand Down Expand Up @@ -97,48 +114,4 @@ private static List<Runnable> getCancellationChecks(SearchContext context) {

return cancellationChecks;
}

public static void execute(SearchContext context) {
if (context.aggregations() == null) {
context.queryResult().aggregations(null);
return;
}

if (context.queryResult().hasAggs()) {
// no need to compute the aggs twice, they should be computed on a per context basis
return;
}

final List<InternalAggregations> internalAggregations = new ArrayList<>(context.aggregations().aggregators().size());
for (Aggregator[] aggregators : context.aggregations().aggregators()) {
final List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
for (Aggregator aggregator : aggregators) {
try {
aggregations.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
// release the aggregator to claim the used bytes as we don't need it anymore
aggregator.releaseAggregations();
}
internalAggregations.add(InternalAggregations.from(aggregations));
}

if (internalAggregations.size() > 1) {
// we execute this search using more than one slice. In order to keep memory requirements
// low, we do a partial reduction here.
context.queryResult()
.aggregations(
InternalAggregations.topLevelReduce(
internalAggregations,
context.aggregations().getAggregationReduceContextBuilder().forPartialReduction()
)
);
} else {
context.queryResult().aggregations(internalAggregations.get(0));
}

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreMode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/** Collector that controls the life cycle of an aggregation document collection. */
public class AggregatorCollector implements Collector {
final Aggregator[] aggregators;
final BucketCollector bucketCollector;
final List<InternalAggregation> internalAggregations;

public AggregatorCollector(Aggregator[] aggregators, BucketCollector bucketCollector) {
this.aggregators = aggregators;
this.bucketCollector = bucketCollector;
this.internalAggregations = new ArrayList<>(aggregators.length);
}

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null, null));
}

@Override
public ScoreMode scoreMode() {
return bucketCollector.scoreMode();
}

/** Should be call after collecting the documents. It generates the internal aggregations which are
* stored on {@code internalAggregations} */
public void finish() throws IOException {
bucketCollector.postCollection();
for (Aggregator aggregator : aggregators) {
internalAggregations.add(aggregator.buildTopLevel());
// release the aggregator to claim the used bytes as we don't need it anymore
aggregator.releaseAggregations();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
*/
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

/**
Expand All @@ -21,8 +18,7 @@ public class SearchContextAggregations {

private final AggregatorFactories factories;
private final Supplier<AggregationReduceContext.Builder> toAggregationReduceContextBuilder;
private final List<Aggregator[]> aggregators;
private CollectorManager<Collector, Void> aggCollectorManager;
private CollectorManager<AggregatorCollector, Void> aggCollectorManager;

/**
* Creates a new aggregation context with the parsed aggregator factories
Expand All @@ -33,37 +29,23 @@ public SearchContextAggregations(
) {
this.factories = factories;
this.toAggregationReduceContextBuilder = toAggregationReduceContextBuilder;
this.aggregators = new ArrayList<>();
}

public AggregatorFactories factories() {
return factories;
}

public List<Aggregator[]> aggregators() {
return aggregators;
}

/**
* Registers all the created aggregators (top level aggregators) for the search execution context.
*
* @param aggregators The top level aggregators of the search execution.
*/
public void aggregators(Aggregator[] aggregators) {
this.aggregators.add(aggregators);
}

/**
* Registers the collector to be run for the aggregations phase
*/
public void registerAggsCollectorManager(CollectorManager<Collector, Void> aggCollectorManager) {
public void registerAggsCollectorManager(CollectorManager<AggregatorCollector, Void> aggCollectorManager) {
this.aggCollectorManager = aggCollectorManager;
}

/**
* Returns the collector to be run for the aggregations phase
*/
public CollectorManager<Collector, Void> getAggsCollectorManager() {
public CollectorManager<AggregatorCollector, Void> getAggsCollectorManager() {
return aggCollectorManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.apache.lucene.sandbox.search.ProfilerCollector;
import org.apache.lucene.search.Collector;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.AggregatorCollector;
import org.elasticsearch.search.query.QueryPhaseCollector;

import java.io.IOException;
Expand Down Expand Up @@ -83,8 +83,8 @@ public void doPostCollection() throws IOException {
profileCollector.doPostCollection();
} else if (wrappedCollector instanceof QueryPhaseCollector queryPhaseCollector) {
queryPhaseCollector.doPostCollection();
} else if (wrappedCollector instanceof BucketCollector.BucketCollectorWrapper aggsCollector) {
aggsCollector.bucketCollector().postCollection();
} else if (wrappedCollector instanceof AggregatorCollector aggsCollector) {
aggsCollector.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ static void executeQuery(SearchContext searchContext) throws QueryPhaseExecution

RescorePhase.execute(searchContext);
SuggestPhase.execute(searchContext);
AggregationPhase.execute(searchContext);

if (searchContext.getProfilers() != null) {
searchContext.queryResult().profileResults(searchContext.getProfilers().buildQueryPhaseResults());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.AggregatorCollector;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.profile.query.InternalProfileCollector;

import java.io.IOException;
Expand All @@ -42,6 +43,8 @@
*/
public final class QueryPhaseCollector implements Collector {
private final Collector aggsCollector;
// populated during post collection phase
private InternalAggregations internalAggregations;
private final Collector topDocsCollector;
private final TerminateAfterChecker terminateAfterChecker;
private final Weight postFilterWeight;
Expand Down Expand Up @@ -375,8 +378,8 @@ boolean incrementHitCountAndCheckThreshold() {
};

public void doPostCollection() throws IOException {
if (aggsCollector instanceof BucketCollector.BucketCollectorWrapper bucketCollectorWrapper) {
bucketCollectorWrapper.bucketCollector().postCollection();
if (aggsCollector instanceof AggregatorCollector aggregatorCollector) {
aggregatorCollector.finish();
} else if (aggsCollector instanceof InternalProfileCollector profileCollector) {
profileCollector.doPostCollection();
}
Expand Down
Loading