diff --git a/docs/changelog/101012.yaml b/docs/changelog/101012.yaml new file mode 100644 index 0000000000000..1d5f62bdddba7 --- /dev/null +++ b/docs/changelog/101012.yaml @@ -0,0 +1,5 @@ +pr: 101012 +summary: Adjust `DateHistogram's` bucket accounting to be iteratively +area: Aggregations +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 82716dca7311c..4eaec7034b7f4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -310,6 +310,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent reducedBuckets = new ArrayList<>(); if (pq.size() > 0) { // list of buckets coming from different shards that have the same key @@ -323,6 +324,10 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent= minDocCount || reduceContext.isFinalReduce() == false) { + if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount); + consumeBucketCount = 0; + } reducedBuckets.add(reduced); } currentBuckets.clear(); @@ -344,10 +349,14 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent= minDocCount || reduceContext.isFinalReduce() == false) { reducedBuckets.add(reduced); + if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount); + consumeBucketCount = 0; + } } } } - + reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount); return reducedBuckets; } @@ -387,7 +396,7 @@ private void addEmptyBuckets(List list, AggregationReduceContext reduceC * consumeBucketsAndMaybeBreak. */ class Counter implements LongConsumer { - private int size = list.size(); + private int size; @Override public void accept(long key) { @@ -490,11 +499,9 @@ private void iterateEmptyBuckets(List list, ListIterator iter, L @Override public InternalAggregation reduce(List aggregations, AggregationReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); - boolean alreadyAccountedForBuckets = false; if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { addEmptyBuckets(reducedBuckets, reduceContext); - alreadyAccountedForBuckets = true; } if (InternalOrder.isKeyDesc(order)) { // we just need to reverse here... @@ -508,9 +515,6 @@ public InternalAggregation reduce(List aggregations, Aggreg CollectionUtil.introSort(reducedBuckets, order.comparator()); } } - if (false == alreadyAccountedForBuckets) { - reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); - } return new InternalDateHistogram( getName(), reducedBuckets,