Skip to content

Commit

Permalink
Histogram aggs: add empty buckets only in the final reduce step (#35921)
Browse files Browse the repository at this point in the history
Empty buckets don't need to be added when performing an incremental reduction step, they can be added later in the final reduction step. This will allow us to later remove the max buckets limit when performing non final reduction.
  • Loading branch information
javanna authored Nov 30, 2018
1 parent d765296 commit 0ebc177
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,26 +444,22 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);

// adding empty buckets if needed
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}

if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
// nothing to do, data are already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets
// maintains order
} else if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else {
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else if (InternalOrder.isKeyAsc(order) == false){
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
}

return new InternalDateHistogram(getName(), reducedBuckets, order, minDocCount, offset, emptyBucketInfo,
format, keyed, pipelineAggregators(), getMetaData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,26 +421,22 @@ private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);

// adding empty buckets if needed
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}

if (InternalOrder.isKeyAsc(order) || reduceContext.isFinalReduce() == false) {
// nothing to do, data are already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets
// maintains order
} else if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else {
// sorted by compound order or sub-aggregation, need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
List<Bucket> reverse = new ArrayList<>(reducedBuckets);
Collections.reverse(reverse);
reducedBuckets = reverse;
} else if (InternalOrder.isKeyAsc(order) == false){
// nothing to do when sorting by key ascending, as data is already sorted since shards return
// sorted buckets and the merge-sort performed by reduceBuckets maintains order.
// otherwise, sorted by compound order or sub-aggregation, we need to fall back to a costly n*log(n) sort
CollectionUtil.introSort(reducedBuckets, order.comparator(null));
}
}

return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, pipelineAggregators(),
getMetaData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,19 @@
import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedBucketMetricValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative;
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.ParsedPercentilesBucket;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedExtendedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.ParsedDerivative;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -151,6 +151,7 @@
import static org.elasticsearch.test.XContentTestUtils.insertRandomFields;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractWireSerializingTestCase<T> {
public static final int DEFAULT_MAX_BUCKETS = 100000;
Expand Down Expand Up @@ -267,7 +268,14 @@ public void testReduceRandom() {
new InternalAggregation.ReduceContext(bigArrays, mockScriptService, bucketConsumer,false);
@SuppressWarnings("unchecked")
T reduced = (T) inputs.get(0).reduce(internalAggregations, context);
assertMultiBucketConsumer(reduced, bucketConsumer);
int initialBucketCount = 0;
for (InternalAggregation internalAggregation : internalAggregations) {
initialBucketCount += countInnerBucket(internalAggregation);
}
int reducedBucketCount = countInnerBucket(reduced);
//check that non final reduction never adds buckets
assertThat(reducedBucketCount, lessThanOrEqualTo(initialBucketCount));
assertMultiBucketConsumer(reducedBucketCount, bucketConsumer);
toReduce = new ArrayList<>(toReduce.subList(r, toReduceSize));
toReduce.add(reduced);
}
Expand Down Expand Up @@ -332,14 +340,14 @@ protected NamedXContentRegistry xContentRegistry() {

public final void testFromXContent() throws IOException {
final T aggregation = createTestInstance();
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation);
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
assertFromXContent(aggregation, parsedAggregation);
}

public final void testFromXContentWithRandomFields() throws IOException {
final T aggregation = createTestInstance();
final Aggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
assertFromXContent(aggregation, (ParsedAggregation) parsedAggregation);
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
assertFromXContent(aggregation, parsedAggregation);
}

protected abstract void assertFromXContent(T aggregation, ParsedAggregation parsedAggregation) throws IOException;
Expand Down Expand Up @@ -423,6 +431,10 @@ protected static DocValueFormat randomNumericDocValueFormat() {
}

public static void assertMultiBucketConsumer(Aggregation agg, MultiBucketConsumer bucketConsumer) {
assertThat(bucketConsumer.getCount(), equalTo(countInnerBucket(agg)));
assertMultiBucketConsumer(countInnerBucket(agg), bucketConsumer);
}

private static void assertMultiBucketConsumer(int innerBucketCount, MultiBucketConsumer bucketConsumer) {
assertThat(bucketConsumer.getCount(), equalTo(innerBucketCount));
}
}

0 comments on commit 0ebc177

Please sign in to comment.