Skip to content

Commit

Permalink
Speed up aggs with sub-aggregations
Browse files Browse the repository at this point in the history
This allows many of the optimizations added in elastic#63643 and elastic#68871 to run
on aggregations with sub-aggregations. This should:
* Speed up `terms` aggregations on fields with less than 1000 values that
  also have sub-aggregations. Locally I see 2 second searches run in 1.2
  seconds.
* Applies that same speedup to `range` and `date_histogram` aggregations but
  it feels less impressive because the point range queries are a little
  slower to get up and go.
* Massively speed up `filters` aggregations with sub-aggregations that
  don't have a `parent` aggregation or collect "other" buckets. Also
  save a ton of memory while collecting them.
  • Loading branch information
nik9000 committed Mar 2, 2021
1 parent cb25ae0 commit 250e7e5
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public static FiltersAggregator build(
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
if (canUseFilterByFilter(parent, factories, otherBucketKey)) {
if (canUseFilterByFilter(parent, otherBucketKey)) {
return buildFilterByFilter(name, factories, filters, keyed, otherBucketKey, context, parent, cardinality, metadata);
}
return new FiltersAggregator.Compatible(
Expand All @@ -155,8 +155,8 @@ public static FiltersAggregator build(
* Can this aggregation be executed using the {@link FilterByFilter}? That
* aggregator is much faster than the fallback {@link Compatible} aggregator.
*/
public static boolean canUseFilterByFilter(Aggregator parent, AggregatorFactories factories, String otherBucketKey) {
return parent == null && factories.countAggregators() == 0 && otherBucketKey == null;
public static boolean canUseFilterByFilter(Aggregator parent, String otherBucketKey) {
return parent == null && otherBucketKey == null;
}

/**
Expand All @@ -177,7 +177,7 @@ public static FilterByFilter buildFilterByFilter(
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
if (false == canUseFilterByFilter(parent, factories, otherBucketKey)) {
if (false == canUseFilterByFilter(parent, otherBucketKey)) {
throw new IllegalStateException("Can't execute filter-by-filter");
}
List<QueryToFilterAdapter<?>> filtersWithTopLevel = new ArrayList<>(filters.size());
Expand All @@ -186,6 +186,7 @@ public static FilterByFilter buildFilterByFilter(
}
return new FiltersAggregator.FilterByFilter(
name,
factories,
filtersWithTopLevel,
keyed,
context,
Expand Down Expand Up @@ -274,17 +275,20 @@ public static class FilterByFilter extends FiltersAggregator {
* field.
*/
private int segmentsWithDocCountField;
private int segmentsCollected;
private int segmentsCounted;

private FilterByFilter(
String name,
AggregatorFactories factories,
List<QueryToFilterAdapter<?>> filters,
boolean keyed,
AggregationContext context,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
super(name, AggregatorFactories.EMPTY, filters, keyed, null, context, parent, cardinality, metadata);
super(name, factories, filters, keyed, null, context, parent, cardinality, metadata);
this.profiling = context.profiling();
}

Expand All @@ -294,6 +298,7 @@ private FilterByFilter(
*/
@SuppressWarnings("resource") // We're not in change of anything Closeable
public long estimateCost(long maxCost) throws IOException {
// TODO if we have children we should use a different cost estimate
this.maxCost = maxCost;
if (estimatedCost != -1) {
return estimatedCost;
Expand All @@ -303,7 +308,9 @@ public long estimateCost(long maxCost) throws IOException {
for (LeafReaderContext ctx : searcher().getIndexReader().leaves()) {
CheckedSupplier<Boolean, IOException> canUseMetadata = canUseMetadata(ctx);
for (QueryToFilterAdapter<?> filter : filters()) {
estimatedCost += filter.estimateCountCost(ctx, canUseMetadata);
estimatedCost += subAggregators().length > 0
? filter.estimateCollectCost(ctx)
: filter.estimateCountCost(ctx, canUseMetadata);
if (estimatedCost < 0) {
// We've overflowed so we cap out and stop counting.
estimatedCost = Long.MAX_VALUE;
Expand Down Expand Up @@ -345,20 +352,82 @@ public long estimateCost(long maxCost) throws IOException {
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
Bits live = ctx.reader().getLiveDocs();
Counter counter = new Counter(docCountProvider);
if (false == docCountProvider.alwaysOne()) {
segmentsWithDocCountField++;
}
for (int filterOrd = 0; filterOrd < filters().size(); filterOrd++) {
incrementBucketDocCount(filterOrd, filters().get(filterOrd).count(ctx, counter, live));
if (subAggregators.length == 0) {
// TOOD we'd be better off if we could do sub.isNoop() or something.
/*
* Without sub.isNoop we always end up in the `collectXXX` modes even if
* the sub-aggregators opt out of traditional collection.
*/
collectCount(ctx, live);
} else {
collectSubs(ctx, live, sub);
}
// Throwing this exception is how we communicate to the collection mechanism that we don't need the segment.
throw new CollectionTerminatedException();
}

/**
* Gather a count of the number of documents that match each filter
* without sending any documents to a sub-aggregator. This yields
* the correct response when there aren't any sub-aggregators or they
* all opt out of needing any sort of collection.
*/
private void collectCount(LeafReaderContext ctx, Bits live) throws IOException {
Counter counter = new Counter(docCountProvider);
for (int filterOrd = 0; filterOrd < filters().size(); filterOrd++) {
incrementBucketDocCount(filterOrd, filters().get(filterOrd).count(ctx, counter, live));
}
}

/**
* Collect all documents that match all filters and send them to
* the sub-aggregators. This method is only required when there are
* sub-aggregators that haven't opted out of being collected.
* <p>
* This collects each filter one at a time, resetting the
* sub-aggregators between each filter as though they were hitting
* a fresh segment.
* <p>
* It's <strong>very</strong> tempting to try and collect the
* filters into blocks of matches and then reply the whole block
* into ascending order without the resetting. That'd probably
* work better if the disk was very, very slow and we didn't have
* any kind of disk caching. But with disk caching its about twice
* as fast to collect each filter one by one like this. And it uses
* less memory because there isn't a need to buffer a block of matches.
* And its a hell of a lot less code.
*/
private void collectSubs(LeafReaderContext ctx, Bits live, LeafBucketCollector sub) throws IOException {
class MatchCollector implements LeafCollector {
LeafBucketCollector subCollector = sub;
int filterOrd;

@Override
public void collect(int docId) throws IOException {
collectBucket(subCollector, docId, filterOrd);
}

@Override
public void setScorer(Scorable scorer) throws IOException {
}
}
MatchCollector collector = new MatchCollector();
filters().get(0).collect(ctx, collector, live);
for (int filterOrd = 1; filterOrd < filters().size(); filterOrd++) {
collector.subCollector = collectableSubAggregators.getLeafCollector(ctx);
collector.filterOrd = filterOrd;
filters().get(filterOrd).collect(ctx, collector, live);
}
}

@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add);
add.accept("segments_counted", segmentsCounted);
add.accept("segments_collected", segmentsCollected);
add.accept("segments_with_deleted_docs", segmentsWithDeletedDocs);
add.accept("segments_with_doc_count_field", segmentsWithDocCountField);
if (estimatedCost != -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.search.IndexOrDocValuesQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.IndexSortSortedNumericDocValuesRangeQuery;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
import org.apache.lucene.search.PointRangeQuery;
Expand Down Expand Up @@ -216,12 +217,31 @@ long count(LeafReaderContext ctx, FiltersAggregator.Counter counter, Bits live)
* Estimate the cost of calling {@code #count} in a leaf.
*/
long estimateCountCost(LeafReaderContext ctx, CheckedSupplier<Boolean, IOException> canUseMetadata) throws IOException {
return estimateCollectCost(ctx);
}

/**
* Collect all documents that match this filter in this leaf.
*/
void collect(LeafReaderContext ctx, LeafCollector collector, Bits live) throws IOException {
BulkScorer scorer = bulkScorer(ctx, () -> {});
if (scorer == null) {
// No hits in this segment.
return;
}
scorer.score(collector, live);
}

/**
* Estimate the cost of calling {@code #count} in a leaf.
*/
long estimateCollectCost(LeafReaderContext ctx) throws IOException {
BulkScorer scorer = bulkScorer(ctx, () -> scorersPreparedWhileEstimatingCost++);
if (scorer == null) {
// There aren't any matches for this filter in this leaf
return 0;
}
return scorer.cost(); // TODO in another PR (please) change this to ScorerSupplier.cost
return scorer.cost();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public static FromFilters<?> adaptIntoFiltersOrNull(
// We don't generate sensible Queries for nanoseconds.
return null;
}
if (false == FiltersAggregator.canUseFilterByFilter(parent, factories, null)) {
if (false == FiltersAggregator.canUseFilterByFilter(parent, null)) {
return null;
}
boolean wholeNumbersOnly = false == ((ValuesSource.Numeric) valuesSourceConfig.getValuesSource()).isFloatingPoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static StringTermsAggregatorFromFilters adaptIntoFiltersOrNull(
if (false == valuesSourceConfig.alignesWithSearchIndex()) {
return null;
}
if (false == FiltersAggregator.canUseFilterByFilter(parent, factories, null)) {
if (false == FiltersAggregator.canUseFilterByFilter(parent, null)) {
return null;
}
List<QueryToFilterAdapter<?>> filters = new ArrayList<>();
Expand Down
Loading

0 comments on commit 250e7e5

Please sign in to comment.