Skip to content

Commit

Permalink
[Backport 2.x] Fix auto date histogram rounding assertion bug (#17175)
Browse files Browse the repository at this point in the history
* Fix auto date histogram rounding assertion bug (#17023)

* Add comments explanations for auto date histo increaseRoundingIfNeeded.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Add testFilterRewriteWithTZRoundingRangeAssert() to reproduce auto date histo assertion bug per #16932

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Fix #16932. Ensure optimized path can only increase preparedRounding of agg.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Spotless apply

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Fast fail filter rewrite opt in data histo aggs for non UTC timezones

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Remove redundant UTC check from getInterval().

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Save a call to prepareRounding if roundingIdx is unchanged.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Spotless apply

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Changelog

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Add ZoneId getter for date histo filter rewrite canOptimize check.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Spotless apply

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Disable ff optimzation for composite agg in canOptimize.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Spotless apply

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Handle utc timezone check

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Remove redundant timeZone getter.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Simplify ff prepared rounding check.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

---------

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
(cherry picked from commit de59264)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Remove breaking abstract isUTC() getter from Rounding.java.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

* Remove unused ZoneId getter.

Signed-off-by: Finn Carroll <carrofin@amazon.com>

---------

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: bowenlan-amzn <bowenlan23@gmail.com>
(cherry picked from commit a79c6e8)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and bowenlan-amzn committed Jan 30, 2025
1 parent 56b5726 commit 6db810b
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [WLM] Fix the QueryGroupTask logging bug ([#17169](https://github.com/opensearch-project/OpenSearch/pull/17169))
- Use OpenSearch version to deserialize remote custom metadata([#16494](https://github.com/opensearch-project/OpenSearch/pull/16494))
- Fix the failing CI's with `Failed to load eclipse jdt formatter` error ([#17172](https://github.com/opensearch-project/OpenSearch/pull/17172))
- Fix AutoDateHistogramAggregator rounding assertion failure ([#17023](https://github.com/opensearch-project/OpenSearch/pull/17023))

### Security

[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.18...2.x
39 changes: 23 additions & 16 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ public DateTimeUnit unit() {
return null;
}

/**
* Helper function for checking if the time zone requested for date histogram
* aggregation is utc or not
*/
public boolean isUTC() {
throw new UnsupportedOperationException();

Check warning on line 283 in server/src/main/java/org/opensearch/common/Rounding.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/Rounding.java#L283

Added line #L283 was not covered by tests
}

/**
* A strategy for rounding milliseconds since epoch.
*
Expand Down Expand Up @@ -676,6 +684,11 @@ public String toString() {
return "Rounding[" + unit + " in " + timeZone + "]";
}

@Override
public boolean isUTC() {
return "Z".equals(timeZone.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
}

private abstract class TimeUnitPreparedRounding extends PreparedRounding {
@Override
public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
Expand Down Expand Up @@ -1057,6 +1070,11 @@ public String toString() {
return "Rounding[" + interval + " in " + timeZone + "]";
}

@Override
public boolean isUTC() {
return "Z".equals(timeZone.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
}

private long roundKey(long value, long interval) {
if (value < 0) {
return (value - interval + 1) / interval;
Expand Down Expand Up @@ -1379,6 +1397,11 @@ public boolean equals(Object obj) {
public String toString() {
return delegate + " offset by " + offset;
}

@Override
public boolean isUTC() {
return delegate.isUTC();
}
}

public static Rounding read(StreamInput in) throws IOException {
Expand Down Expand Up @@ -1406,28 +1429,12 @@ public static OptionalLong getInterval(Rounding rounding) {

if (rounding instanceof TimeUnitRounding) {
interval = (((TimeUnitRounding) rounding).unit).extraLocalOffsetLookup();
if (!isUTCTimeZone(((TimeUnitRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return OptionalLong.empty();
}
} else if (rounding instanceof TimeIntervalRounding) {
interval = ((TimeIntervalRounding) rounding).interval;
if (!isUTCTimeZone(((TimeIntervalRounding) rounding).timeZone)) {
// Fast filter aggregation cannot be used if it needs time zone rounding
return OptionalLong.empty();
}
} else {
return OptionalLong.empty();
}

return OptionalLong.of(interval);
}

/**
* Helper function for checking if the time zone requested for date histogram
* aggregation is utc or not
*/
private static boolean isUTCTimeZone(final ZoneId zoneId) {
return "Z".equals(zoneId.getDisplayName(TextStyle.FULL, Locale.ENGLISH));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ protected boolean canOptimize() {
});
}

/**
* The filter rewrite optimized path does not support bucket intervals which are not fixed.
* For this reason we exclude non UTC timezones.
*/
if (valuesSource.getRounding().isUTC() == false) {
return false;
}

// bucketOrds is used for saving the date histogram results got from the optimization path
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ public abstract class DateHistogramAggregatorBridge extends AggregatorBridge {

int maxRewriteFilters;

protected boolean canOptimize(ValuesSourceConfig config) {
protected boolean canOptimize(ValuesSourceConfig config, Rounding rounding) {
/**
* The filter rewrite optimized path does not support bucket intervals which are not fixed.
* For this reason we exclude non UTC timezones.
*/
if (rounding.isUTC() == false) {
return false;
}

if (config.script() == null && config.missing() == null) {
MappedFieldType fieldType = config.fieldType();
if (fieldType instanceof DateFieldMapper.DateFieldType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ private AutoDateHistogramAggregator(
Aggregator parent,
Map<String, Object> metadata
) throws IOException {

super(name, factories, aggregationContext, parent, metadata);
this.targetBuckets = targetBuckets;
// TODO: Remove null usage here, by using a different aggregator for create
Expand All @@ -162,22 +161,34 @@ private AutoDateHistogramAggregator(
DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() {
@Override
protected boolean canOptimize() {
return canOptimize(valuesSourceConfig);
return canOptimize(valuesSourceConfig, roundingInfos[0].rounding);
}

@Override
protected void prepare() throws IOException {
buildRanges(context);
}

/**
* The filter rewrite optimization uses this method to pre-emptively update the preparedRounding
* when considering the optimized path for a single segment. This is necessary since the optimized path
* skips doc collection entirely which is where the preparedRounding is normally updated.
*
* @param low lower bound of rounding to prepare
* @param high upper bound of rounding to prepare
* @return select a prepared rounding which satisfies the conditions:
* 1. Is at least as large as our previously prepared rounding
* 2. Must span a range of [low, high] with buckets <= targetBuckets
*/
@Override
protected Rounding getRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
// reset so this function is idempotent

int prevRoundingIdx = roundingIdx;
roundingIdx = 0;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
Expand All @@ -190,7 +201,11 @@ protected Rounding getRounding(final long low, final long high) {
roundingIdx++;
}

preparedRounding = prepareRounding(roundingIdx);
// Ensure preparedRounding never shrinks
if (roundingIdx > prevRoundingIdx) {
preparedRounding = prepareRounding(roundingIdx);
}

return roundingInfos[roundingIdx].rounding;
}

Expand Down Expand Up @@ -403,12 +418,39 @@ private void collectValue(int doc, long rounded) throws IOException {
increaseRoundingIfNeeded(rounded);
}

/**
* Examine our current bucket count and the most recently added bucket to determine if an update to
* preparedRounding is required to keep total bucket count in compliance with targetBuckets.
*
* @param rounded the most recently collected value rounded
*/
private void increaseRoundingIfNeeded(long rounded) {
// If we are already using the rounding with the largest interval nothing can be done
if (roundingIdx >= roundingInfos.length - 1) {
return;
}

// Re calculate the max and min values we expect to bucket according to most recently rounded val
min = Math.min(min, rounded);
max = Math.max(max, rounded);

/**
* Quick explanation of the two below conditions:
*
* 1. [targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()]
* Represents the total bucket count possible before we will exceed targetBuckets
* even if we use the maximum inner interval of our current rounding. For example, consider the
* DAYS_OF_MONTH rounding where the maximum inner interval is 7 days (i.e. 1 week buckets).
* targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() would then be the number of
* 1 day buckets possible such that if we re-bucket to 1 week buckets we will have more 1 week buckets
* than our targetBuckets limit. If the current count of buckets exceeds this limit we must update
* our rounding.
*
* 2. [targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()]
* The total duration of ms covered by our current rounding. In the case of MINUTES_OF_HOUR rounding
* getMaximumRoughEstimateDurationMillis is 60000. If our current total range in millis (max - min)
* exceeds this range we must update our rounding.
*/
if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()
&& max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() {
@Override
protected boolean canOptimize() {
return canOptimize(valuesSourceConfig);
return canOptimize(valuesSourceConfig, rounding);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
Expand Down Expand Up @@ -72,6 +74,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.YearMonth;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -912,6 +915,58 @@ public void testWithPipelineReductions() throws IOException {
);
}

// Bugfix: https://github.com/opensearch-project/OpenSearch/issues/16932
public void testFilterRewriteWithTZRoundingRangeAssert() throws IOException {
/*
multiBucketIndexData must overlap with DST to produce a 'LinkedListLookup' prepared rounding.
This lookup rounding style maintains a strict max/min input range and will assert each value is in range.
*/
final List<ZonedDateTime> multiBucketIndexData = Arrays.asList(
ZonedDateTime.of(2023, 10, 10, 0, 0, 0, 0, ZoneOffset.UTC),
ZonedDateTime.of(2023, 11, 11, 0, 0, 0, 0, ZoneOffset.UTC)
);

final List<ZonedDateTime> singleBucketIndexData = Arrays.asList(ZonedDateTime.of(2023, 12, 27, 0, 0, 0, 0, ZoneOffset.UTC));

try (Directory directory = newDirectory()) {
/*
Ensure we produce two segments on one shard such that the documents in seg 1 will be out of range of the
prepared rounding produced by the filter rewrite optimization considering seg 2 for optimized path.
*/
IndexWriterConfig c = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE);
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, c)) {
indexSampleData(multiBucketIndexData, indexWriter);
indexWriter.flush();
indexSampleData(singleBucketIndexData, indexWriter);
}

try (IndexReader indexReader = DirectoryReader.open(directory)) {
final IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

// Force agg to update rounding when it begins collecting from the second segment.
final AutoDateHistogramAggregationBuilder aggregationBuilder = new AutoDateHistogramAggregationBuilder("_name");
aggregationBuilder.setNumBuckets(3).field(DATE_FIELD).timeZone(ZoneId.of("America/New_York"));

Map<String, Integer> expectedDocCount = new TreeMap<>();
expectedDocCount.put("2023-10-01T00:00:00.000-04:00", 1);
expectedDocCount.put("2023-11-01T00:00:00.000-04:00", 1);
expectedDocCount.put("2023-12-01T00:00:00.000-05:00", 1);

final InternalAutoDateHistogram histogram = searchAndReduce(
indexSearcher,
DEFAULT_QUERY,
aggregationBuilder,
false,
new DateFieldMapper.DateFieldType(aggregationBuilder.field()),
new NumberFieldMapper.NumberFieldType(INSTANT_FIELD, NumberFieldMapper.NumberType.LONG),
new NumberFieldMapper.NumberFieldType(NUMERIC_FIELD, NumberFieldMapper.NumberType.LONG)
);

assertThat(bucketCountsAsMap(histogram), equalTo(expectedDocCount));
}
}
}

@Override
protected IndexSettings createIndexSettings() {
final Settings nodeSettings = Settings.builder().put("search.max_buckets", 25000).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,9 +607,19 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
boolean shardFanOut,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, shardFanOut, fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, randomBoolean(), fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
Expand All @@ -619,7 +629,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
AggregationBuilder builder,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, randomBoolean(), fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
Expand All @@ -629,7 +639,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
int maxBucket,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, randomBoolean(), fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
Expand All @@ -638,9 +648,10 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
Query query,
AggregationBuilder builder,
int maxBucket,
boolean shardFanOut,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, fieldTypes);
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, shardFanOut, fieldTypes);
}

/**
Expand All @@ -658,6 +669,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
AggregationBuilder builder,
int maxBucket,
boolean hasNested,
boolean shardFanOut,
MappedFieldType... fieldTypes
) throws IOException {
final IndexReaderContext ctx = searcher.getTopReaderContext();
Expand All @@ -673,7 +685,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
);
C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes);

if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) {
if (shardFanOut && searcher.getIndexReader().leaves().size() > 0) {
assertThat(ctx, instanceOf(CompositeReaderContext.class));
final CompositeReaderContext compCTX = (CompositeReaderContext) ctx;
final int size = compCTX.leaves().size();
Expand Down

0 comments on commit 6db810b

Please sign in to comment.