Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Improving the performance of date histogram aggregation (without any … #11390

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Disallow removing some metadata fields by remove ingest processor ([#10895](https://github.com/opensearch-project/OpenSearch/pull/10895))
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ setup:
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }

---
Expand Down
38 changes: 27 additions & 11 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ long roundFloor(long utcMillis) {
}

@Override
long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -110,7 +110,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -121,7 +121,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundQuarterOfYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -132,7 +132,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundMonthOfYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -141,7 +141,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, this.ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -150,7 +150,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -165,7 +165,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -180,7 +180,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
jainankitk marked this conversation as resolved.
Show resolved Hide resolved
return ratio;
}
};
Expand Down Expand Up @@ -217,7 +217,7 @@ long extraLocalOffsetLookup() {
* look up so that we can see transitions that we might have rounded
* down beyond.
*/
abstract long extraLocalOffsetLookup();
public abstract long extraLocalOffsetLookup();

public byte getId() {
return id;
Expand Down Expand Up @@ -488,7 +488,7 @@ public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
*
* @opensearch.internal
*/
static class TimeUnitRounding extends Rounding {
public static class TimeUnitRounding extends Rounding {
static final byte ID = 1;

private final DateTimeUnit unit;
Expand Down Expand Up @@ -523,6 +523,14 @@ public byte id() {
return ID;
}

public DateTimeUnit getUnit() {
return this.unit;
}

public ZoneId getTimeZone() {
return this.timeZone;
}

private LocalDateTime truncateLocalDateTime(LocalDateTime localDateTime) {
switch (unit) {
case SECOND_OF_MINUTE:
Expand Down Expand Up @@ -953,7 +961,7 @@ public final long nextRoundingValue(long utcMillis) {
*
* @opensearch.internal
*/
static class TimeIntervalRounding extends Rounding {
public static class TimeIntervalRounding extends Rounding {
static final byte ID = 2;

private final long interval;
Expand Down Expand Up @@ -984,6 +992,14 @@ public byte id() {
return ID;
}

public long getInterval() {
return this.interval;
}

public ZoneId getTimeZone() {
return this.timeZone;
}

@Override
public Prepared prepare(long minUtcMillis, long maxUtcMillis) {
long minLookup = minUtcMillis - interval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,16 @@ public long parse(String value) {
return resolution.convert(DateFormatters.from(dateTimeFormatter().parse(value), dateTimeFormatter().locale()).toInstant());
}

public long convertNanosToMillis(long nanoSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toMilliSeconds(nanoSecondsSinceEpoch);
return nanoSecondsSinceEpoch;
}

public long convertRoundedMillisToNanos(long milliSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toNanoSeconds(milliSecondsSinceEpoch);
return milliSecondsSinceEpoch;
}

@Override
public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) {
DateFormatter defaultFormatter = dateTimeFormatter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Rounding;
import org.opensearch.common.Rounding.Prepared;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -125,9 +127,13 @@
* {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}.
*/
private MergingBucketsDeferringCollector deferringCollector;
private final Weight[] filters;
private final DateFieldMapper.DateFieldType fieldType;

protected final RoundingInfo[] roundingInfos;
protected final int targetBuckets;
protected int roundingIdx;
protected Rounding.Prepared preparedRounding;

private AutoDateHistogramAggregator(
String name,
Expand All @@ -148,8 +154,51 @@
this.formatter = valuesSourceConfig.format();
this.roundingInfos = roundingInfos;
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
subAggregators.length,
context,
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding,
valuesSourceConfig,
fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field())
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
fieldType = null;
filters = null;
}
}

private Rounding getMinimumRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
}
roundingIdx++;
}

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}

protected abstract LongKeyedBucketOrds getBucketOrds();

@Override
public final ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
Expand All @@ -176,7 +225,32 @@
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
return getLeafCollector(valuesSource.longValues(ctx), sub);

final SortedNumericDocValues values = valuesSource.longValues(ctx);
final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {

Check warning on line 243 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java#L243

Added line #L243 was not covered by tests
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

iteratingCollector.collect(doc, owningBucketOrd);
}
};
}

protected final InternalAggregation[] buildAggregations(
Expand Down Expand Up @@ -247,8 +321,6 @@
* @opensearch.internal
*/
private static class FromSingle extends AutoDateHistogramAggregator {
private int roundingIdx;
private Rounding.Prepared preparedRounding;
/**
* Map from value to bucket ordinals.
* <p>
Expand Down Expand Up @@ -286,10 +358,14 @@
metadata
);

preparedRounding = prepareRounding(0);
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down Expand Up @@ -507,6 +583,11 @@
liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true);
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;

Check warning on line 588 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java#L588

Added line #L588 was not covered by tests
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down
Loading
Loading