Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase search.max_buckets to 65,535 #57042

Merged
merged 13 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/aggregations/bucket.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ define fixed number of multiple buckets, and others dynamically create the bucke

NOTE: The maximum number of buckets allowed in a single response is limited by a
dynamic cluster setting named
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 10,000,
<<search-settings-max-buckets,`search.max_buckets`>>. It defaults to 65,535,
requests that try to return more than the limit will fail with an exception.

include::bucket/adjacency-matrix-aggregation.asciidoc[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
* An aggregation service that creates instances of {@link MultiBucketConsumer}.
* The consumer is used by {@link BucketsAggregator} and {@link InternalMultiBucketAggregation} to limit the number of buckets created
* in {@link Aggregator#buildAggregations} and {@link InternalAggregation#reduce}.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 10000.
* The limit can be set by changing the `search.max_buckets` cluster setting and defaults to 65535.
*/
public class MultiBucketConsumerService {
public static final int DEFAULT_MAX_BUCKETS = 10000;
public static final int DEFAULT_MAX_BUCKETS = 65535;
public static final Setting<Integer> MAX_BUCKET_SETTING =
Setting.intSetting("search.max_buckets", DEFAULT_MAX_BUCKETS, 0, Setting.Property.NodeScope, Setting.Property.Dynamic);

Expand Down Expand Up @@ -102,6 +102,7 @@ public static class MultiBucketConsumer implements IntConsumer {

// aggregations execute in a single thread so no atomic here
private int count;
private int callCount = 0;

public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
Expand All @@ -110,15 +111,17 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) {

@Override
public void accept(int value) {
count += value;
if (count > limit) {
throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
if (value != 0) {
count += value;
if (count > limit) {
throw new TooManyBucketsException("Trying to create too many buckets. Must be less than or equal to: [" + limit
+ "] but was [" + count + "]. This limit can be set by changing the [" +
MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", limit);
}
}

// check parent circuit breaker every 1024 buckets
if (value > 0 && (count & 0x3FF) == 0) {
// check parent circuit breaker every 1024 calls
callCount++;
if ((callCount & 0x3FF) == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for fixing this behavior

breaker.addEstimateBytesAndMaybeBreak(0, "allocated_buckets");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public BucketsAggregator(String name, AggregatorFactories factories, SearchConte
Map<String, Object> metadata) throws IOException {
super(name, factories, context, parent, metadata);
bigArrays = context.bigArrays();
docCounts = bigArrays.newIntArray(1, true);
if (context.aggregations() != null) {
multiBucketConsumer = context.aggregations().multiBucketConsumer();
} else {
multiBucketConsumer = (count) -> {};
}
docCounts = bigArrays.newIntArray(1, true);
}

/**
Expand Down Expand Up @@ -91,7 +91,12 @@ public final void collectBucket(LeafBucketCollector subCollector, int doc, long
* Same as {@link #collectBucket(LeafBucketCollector, int, long)}, but doesn't check if the docCounts needs to be re-sized.
*/
public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException {
docCounts.increment(bucketOrd, 1);
if (docCounts.increment(bucketOrd, 1) == 1) {
// We calculate the final number of buckets only during the reduce phase. But we still need to
// trigger bucket consumer from time to time in order to give it a chance to check available memory and break
// the execution if we are running out. To achieve that we are passing 0 as a bucket count.
multiBucketConsumer.accept(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment here why we're using 0, just in case a lost soul stumbles over this and is confused :)

}
subCollector.collect(doc, bucketOrd);
}

Expand Down Expand Up @@ -137,14 +142,6 @@ public final int bucketDocCount(long bucketOrd) {
}
}

/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
*/
protected final void consumeBucketsAndMaybeBreak(int count) {
multiBucketConsumer.accept(count);
}

/**
* Hook to allow taking an action before building buckets.
*/
Expand Down Expand Up @@ -186,7 +183,7 @@ public InternalAggregation get(int index) {
public int size() {
return aggregations.length;
}
});
});
}
return result;
}
Expand Down Expand Up @@ -267,7 +264,6 @@ protected final <B> void buildSubAggsForBuckets(List<B> buckets,
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(long[] owningBucketOrds, int bucketsPerOwningBucketOrd,
BucketBuilderForFixedCount<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
consumeBucketsAndMaybeBreak(totalBuckets);
long[] bucketOrdsToCollect = new long[totalBuckets];
int bucketOrdIdx = 0;
for (long owningBucketOrd : owningBucketOrds) {
Expand Down Expand Up @@ -299,7 +295,7 @@ protected interface BucketBuilderForFixedCount<B> {
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param resultBuilder how to build a result from the sub aggregation results
*/
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds,
SingleBucketResultBuilder resultBuilder) throws IOException {
/*
* It'd be entirely reasonable to call
Expand Down Expand Up @@ -328,7 +324,6 @@ protected interface SingleBucketResultBuilder {
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(long[] owningBucketOrds, LongHash bucketOrds,
BucketBuilderForVariable<B> bucketBuilder, Function<List<B>, InternalAggregation> resultBuilder) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0;
consumeBucketsAndMaybeBreak((int) bucketOrds.size());
long[] bucketOrdsToCollect = new long[(int) bucketOrds.size()];
for (int bucketOrd = 0; bucketOrd < bucketOrds.size(); bucketOrd++) {
bucketOrdsToCollect[bucketOrd] = bucketOrd;
Expand Down Expand Up @@ -360,7 +355,6 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(lo
throw new AggregationExecutionException("Can't collect more than [" + Integer.MAX_VALUE
+ "] buckets but attempted [" + totalOrdsToCollect + "]");
}
consumeBucketsAndMaybeBreak((int) totalOrdsToCollect);
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
int b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
totalBucketsToBuild++;
}
}
consumeBucketsAndMaybeBreak(totalBucketsToBuild);
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
int builtBucketIndex = 0;
for (int ord = 0; ord < maxOrd; ord++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,10 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
if(reducedBucket.docCount >= 1){
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reducedBucket);
} else {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(reducedBucket));
}
}
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
Collections.sort(reducedBuckets, Comparator.comparing(InternalBucket::getKey));

InternalAdjacencyMatrix reduced = new InternalAdjacencyMatrix(name, reducedBuckets, getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ protected void doPostCollection() throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
consumeBucketsAndMaybeBreak(queue.size());
if (deferredCollectors != NO_OP_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
buckets.clear();
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
if (result.size() >= size) {
break;
Expand All @@ -192,7 +191,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
}
if (buckets.size() > 0) {
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}

Expand All @@ -205,6 +203,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
reducedFormats = lastBucket.formats;
lastKey = lastBucket.getRawKey();
}
reduceContext.consumeBucketsAndMaybeBreak(result.size());
return new InternalComposite(name, size, sourceNames, reducedFormats, result, lastKey, reverseMuls,
earlyTerminated, metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,23 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
consumeBucketsAndMaybeBreak(size);


BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = newEmptyBucket();
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,14 @@ public InternalGeoGrid reduce(List<InternalAggregation> aggregations, ReduceCont
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) {
List<InternalGeoGridBucket> sameCellBuckets = cursor.value;
InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
if (removed != null) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
}
ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
}
buckets.close();
InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; i--) {
list[i] = ordered.pop();
}
reduceContext.consumeBucketsAndMaybeBreak(list.length);
return create(getName(), requiredSize, Arrays.asList(list), getMetadata());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {
if (reduceRounding.round(top.current.key) != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
currentBuckets.clear();
key = reduceRounding.round(top.current.key);
Expand All @@ -348,7 +347,6 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent b) {

if (currentBuckets.isEmpty() == false) {
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reduced);
}
}
Expand Down Expand Up @@ -376,22 +374,17 @@ private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRo
long roundedBucketKey = reduceRounding.round(bucket.key);
if (Double.isNaN(key)) {
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else if (roundedBucketKey == key) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
} else {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundedBucketKey;
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
}
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
reducedBuckets = mergedBuckets;
Expand Down Expand Up @@ -449,7 +442,6 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
if (lastBucket != null) {
long key = rounding.nextRoundingValue(lastBucket.key);
while (key < nextBucket.key) {
reduceContext.consumeBucketsAndMaybeBreak(1);
iter.add(new InternalAutoDateHistogram.Bucket(key, 0, format, reducedEmptySubAggs));
key = rounding.nextRoundingValue(key);
}
Expand Down Expand Up @@ -515,7 +507,7 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
// Now finally see if we need to merge consecutive buckets together to make a coarser interval at the same rounding
reducedBucketsResult = maybeMergeConsecutiveBuckets(reducedBucketsResult, reduceContext);
}

reduceContext.consumeBucketsAndMaybeBreak(reducedBucketsResult.buckets.size());
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundingInfos, reducedBucketsResult.roundingIdx,
this.bucketInfo.emptySubAggregations);

Expand Down Expand Up @@ -551,16 +543,13 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
for (int i = 0; i < reducedBuckets.size(); i++) {
Bucket bucket = reducedBuckets.get(i);
if (i % mergeInterval == 0 && sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
sameKeyedBuckets.clear();
key = roundingInfo.rounding.round(bucket.key);
}
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
mergedBuckets.add(reduceBucket(sameKeyedBuckets, reduceContext));
}
return new BucketReduceResult(mergedBuckets, roundingInfo, roundingIdx, mergeInterval);
Expand Down
Loading