From 1e62b3a25a9969d6c5bb609f5f4cf5643c6ae50c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 18 Oct 2023 14:36:32 +0200 Subject: [PATCH] Adjust DateHistogram's bucket accounting to be iteratively (#101012) Adjust DateHistogram's consumeBucketsAndMaybeBreak to be iteratively during reduce instead accounting all buckets at the end of the reduce. In case of many non-empty buckets accounting the number of buckets at the end of the reduce may be too late. Elasticsearch may already have failed with an OOME. This change changes the accounting to happen iteratively during the reduce for non-empty bucket. Note that for empty buckets accounting of the number of buckets already happens iteratively. --- docs/changelog/101012.yaml | 5 +++++ .../histogram/InternalDateHistogram.java | 18 +++++++++++------- 2 files changed, 16 insertions(+), 7 deletions(-) create mode 100644 docs/changelog/101012.yaml diff --git a/docs/changelog/101012.yaml b/docs/changelog/101012.yaml new file mode 100644 index 0000000000000..1d5f62bdddba7 --- /dev/null +++ b/docs/changelog/101012.yaml @@ -0,0 +1,5 @@ +pr: 101012 +summary: Adjust `DateHistogram's` bucket accounting to be iteratively +area: Aggregations +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 82716dca7311c..4eaec7034b7f4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -310,6 +310,7 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent reducedBuckets = new ArrayList<>(); if (pq.size() > 0) { // list of buckets coming from different shards that have the same key @@ -323,6 +324,10 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent= minDocCount || reduceContext.isFinalReduce() == false) { + if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount); + consumeBucketCount = 0; + } reducedBuckets.add(reduced); } currentBuckets.clear(); @@ -344,10 +349,14 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent= minDocCount || reduceContext.isFinalReduce() == false) { reducedBuckets.add(reduced); + if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount); + consumeBucketCount = 0; + } } } } - + reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount); return reducedBuckets; } @@ -387,7 +396,7 @@ private void addEmptyBuckets(List list, AggregationReduceContext reduceC * consumeBucketsAndMaybeBreak. */ class Counter implements LongConsumer { - private int size = list.size(); + private int size; @Override public void accept(long key) { @@ -490,11 +499,9 @@ private void iterateEmptyBuckets(List list, ListIterator iter, L @Override public InternalAggregation reduce(List aggregations, AggregationReduceContext reduceContext) { List reducedBuckets = reduceBuckets(aggregations, reduceContext); - boolean alreadyAccountedForBuckets = false; if (reduceContext.isFinalReduce()) { if (minDocCount == 0) { addEmptyBuckets(reducedBuckets, reduceContext); - alreadyAccountedForBuckets = true; } if (InternalOrder.isKeyDesc(order)) { // we just need to reverse here... @@ -508,9 +515,6 @@ public InternalAggregation reduce(List aggregations, Aggreg CollectionUtil.introSort(reducedBuckets, order.comparator()); } } - if (false == alreadyAccountedForBuckets) { - reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size()); - } return new InternalDateHistogram( getName(), reducedBuckets,