diff --git a/CHANGELOG.md b/CHANGELOG.md index 111c6279828cf..f3c3ba3fa0875 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -117,6 +117,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [WLM] Fix the QueryGroupTask logging bug ([#17169](https://github.com/opensearch-project/OpenSearch/pull/17169)) - Use OpenSearch version to deserialize remote custom metadata([#16494](https://github.com/opensearch-project/OpenSearch/pull/16494)) - Fix the failing CI's with `Failed to load eclipse jdt formatter` error ([#17172](https://github.com/opensearch-project/OpenSearch/pull/17172)) +- Fix AutoDateHistogramAggregator rounding assertion failure ([#17023](https://github.com/opensearch-project/OpenSearch/pull/17023)) + ### Security [Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.18...2.x diff --git a/server/src/main/java/org/opensearch/common/Rounding.java b/server/src/main/java/org/opensearch/common/Rounding.java index df911ecbbafe8..c12176a18882d 100644 --- a/server/src/main/java/org/opensearch/common/Rounding.java +++ b/server/src/main/java/org/opensearch/common/Rounding.java @@ -275,6 +275,14 @@ public DateTimeUnit unit() { return null; } + /** + * Helper function for checking if the time zone requested for date histogram + * aggregation is utc or not + */ + public boolean isUTC() { + throw new UnsupportedOperationException(); + } + /** * A strategy for rounding milliseconds since epoch. * @@ -676,6 +684,11 @@ public String toString() { return "Rounding[" + unit + " in " + timeZone + "]"; } + @Override + public boolean isUTC() { + return "Z".equals(timeZone.getDisplayName(TextStyle.FULL, Locale.ENGLISH)); + } + private abstract class TimeUnitPreparedRounding extends PreparedRounding { @Override public double roundingSize(long utcMillis, DateTimeUnit timeUnit) { @@ -1057,6 +1070,11 @@ public String toString() { return "Rounding[" + interval + " in " + timeZone + "]"; } + @Override + public boolean isUTC() { + return "Z".equals(timeZone.getDisplayName(TextStyle.FULL, Locale.ENGLISH)); + } + private long roundKey(long value, long interval) { if (value < 0) { return (value - interval + 1) / interval; @@ -1379,6 +1397,11 @@ public boolean equals(Object obj) { public String toString() { return delegate + " offset by " + offset; } + + @Override + public boolean isUTC() { + return delegate.isUTC(); + } } public static Rounding read(StreamInput in) throws IOException { @@ -1406,28 +1429,12 @@ public static OptionalLong getInterval(Rounding rounding) { if (rounding instanceof TimeUnitRounding) { interval = (((TimeUnitRounding) rounding).unit).extraLocalOffsetLookup(); - if (!isUTCTimeZone(((TimeUnitRounding) rounding).timeZone)) { - // Fast filter aggregation cannot be used if it needs time zone rounding - return OptionalLong.empty(); - } } else if (rounding instanceof TimeIntervalRounding) { interval = ((TimeIntervalRounding) rounding).interval; - if (!isUTCTimeZone(((TimeIntervalRounding) rounding).timeZone)) { - // Fast filter aggregation cannot be used if it needs time zone rounding - return OptionalLong.empty(); - } } else { return OptionalLong.empty(); } return OptionalLong.of(interval); } - - /** - * Helper function for checking if the time zone requested for date histogram - * aggregation is utc or not - */ - private static boolean isUTCTimeZone(final ZoneId zoneId) { - return "Z".equals(zoneId.getDisplayName(TextStyle.FULL, Locale.ENGLISH)); - } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 0a200fcbc105b..91f9a43139e10 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -182,6 +182,14 @@ protected boolean canOptimize() { }); } + /** + * The filter rewrite optimized path does not support bucket intervals which are not fixed. + * For this reason we exclude non UTC timezones. + */ + if (valuesSource.getRounding().isUTC() == false) { + return false; + } + // bucketOrds is used for saving the date histogram results got from the optimization path bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); return true; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java index 8bff3fdc5fefb..c04423c2f7d70 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/DateHistogramAggregatorBridge.java @@ -34,7 +34,15 @@ public abstract class DateHistogramAggregatorBridge extends AggregatorBridge { int maxRewriteFilters; - protected boolean canOptimize(ValuesSourceConfig config) { + protected boolean canOptimize(ValuesSourceConfig config, Rounding rounding) { + /** + * The filter rewrite optimized path does not support bucket intervals which are not fixed. + * For this reason we exclude non UTC timezones. + */ + if (rounding.isUTC() == false) { + return false; + } + if (config.script() == null && config.missing() == null) { MappedFieldType fieldType = config.fieldType(); if (fieldType instanceof DateFieldMapper.DateFieldType) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index f3a36b4882d19..cbeb27e8a3e63 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -149,7 +149,6 @@ private AutoDateHistogramAggregator( Aggregator parent, Map metadata ) throws IOException { - super(name, factories, aggregationContext, parent, metadata); this.targetBuckets = targetBuckets; // TODO: Remove null usage here, by using a different aggregator for create @@ -162,7 +161,7 @@ private AutoDateHistogramAggregator( DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() { @Override protected boolean canOptimize() { - return canOptimize(valuesSourceConfig); + return canOptimize(valuesSourceConfig, roundingInfos[0].rounding); } @Override @@ -170,6 +169,17 @@ protected void prepare() throws IOException { buildRanges(context); } + /** + * The filter rewrite optimization uses this method to pre-emptively update the preparedRounding + * when considering the optimized path for a single segment. This is necessary since the optimized path + * skips doc collection entirely which is where the preparedRounding is normally updated. + * + * @param low lower bound of rounding to prepare + * @param high upper bound of rounding to prepare + * @return select a prepared rounding which satisfies the conditions: + * 1. Is at least as large as our previously prepared rounding + * 2. Must span a range of [low, high] with buckets <= targetBuckets + */ @Override protected Rounding getRounding(final long low, final long high) { // max - min / targetBuckets = bestDuration @@ -177,7 +187,8 @@ protected Rounding getRounding(final long low, final long high) { // since we cannot exceed targetBuckets, bestDuration should go up, // so the right innerInterval should be an upper bound long bestDuration = (high - low) / targetBuckets; - // reset so this function is idempotent + + int prevRoundingIdx = roundingIdx; roundingIdx = 0; while (roundingIdx < roundingInfos.length - 1) { final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx]; @@ -190,7 +201,11 @@ protected Rounding getRounding(final long low, final long high) { roundingIdx++; } - preparedRounding = prepareRounding(roundingIdx); + // Ensure preparedRounding never shrinks + if (roundingIdx > prevRoundingIdx) { + preparedRounding = prepareRounding(roundingIdx); + } + return roundingInfos[roundingIdx].rounding; } @@ -403,12 +418,39 @@ private void collectValue(int doc, long rounded) throws IOException { increaseRoundingIfNeeded(rounded); } + /** + * Examine our current bucket count and the most recently added bucket to determine if an update to + * preparedRounding is required to keep total bucket count in compliance with targetBuckets. + * + * @param rounded the most recently collected value rounded + */ private void increaseRoundingIfNeeded(long rounded) { + // If we are already using the rounding with the largest interval nothing can be done if (roundingIdx >= roundingInfos.length - 1) { return; } + + // Re calculate the max and min values we expect to bucket according to most recently rounded val min = Math.min(min, rounded); max = Math.max(max, rounded); + + /** + * Quick explanation of the two below conditions: + * + * 1. [targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()] + * Represents the total bucket count possible before we will exceed targetBuckets + * even if we use the maximum inner interval of our current rounding. For example, consider the + * DAYS_OF_MONTH rounding where the maximum inner interval is 7 days (i.e. 1 week buckets). + * targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() would then be the number of + * 1 day buckets possible such that if we re-bucket to 1 week buckets we will have more 1 week buckets + * than our targetBuckets limit. If the current count of buckets exceeds this limit we must update + * our rounding. + * + * 2. [targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()] + * The total duration of ms covered by our current rounding. In the case of MINUTES_OF_HOUR rounding + * getMaximumRoughEstimateDurationMillis is 60000. If our current total range in millis (max - min) + * exceeds this range we must update our rounding. + */ if (bucketOrds.size() <= targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval() && max - min <= targetBuckets * roundingInfos[roundingIdx].getMaximumRoughEstimateDurationMillis()) { return; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 0482188a33b14..ce6a2dc8ebe46 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -143,7 +143,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg DateHistogramAggregatorBridge bridge = new DateHistogramAggregatorBridge() { @Override protected boolean canOptimize() { - return canOptimize(valuesSourceConfig); + return canOptimize(valuesSourceConfig, rounding); } @Override diff --git a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java index dda053af78b30..95f56d779b088 100644 --- a/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java +++ b/server/src/test/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregatorTests.java @@ -38,7 +38,9 @@ import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; @@ -72,6 +74,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.YearMonth; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.ArrayList; @@ -912,6 +915,58 @@ public void testWithPipelineReductions() throws IOException { ); } + // Bugfix: https://github.com/opensearch-project/OpenSearch/issues/16932 + public void testFilterRewriteWithTZRoundingRangeAssert() throws IOException { + /* + multiBucketIndexData must overlap with DST to produce a 'LinkedListLookup' prepared rounding. + This lookup rounding style maintains a strict max/min input range and will assert each value is in range. + */ + final List multiBucketIndexData = Arrays.asList( + ZonedDateTime.of(2023, 10, 10, 0, 0, 0, 0, ZoneOffset.UTC), + ZonedDateTime.of(2023, 11, 11, 0, 0, 0, 0, ZoneOffset.UTC) + ); + + final List singleBucketIndexData = Arrays.asList(ZonedDateTime.of(2023, 12, 27, 0, 0, 0, 0, ZoneOffset.UTC)); + + try (Directory directory = newDirectory()) { + /* + Ensure we produce two segments on one shard such that the documents in seg 1 will be out of range of the + prepared rounding produced by the filter rewrite optimization considering seg 2 for optimized path. + */ + IndexWriterConfig c = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE); + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, c)) { + indexSampleData(multiBucketIndexData, indexWriter); + indexWriter.flush(); + indexSampleData(singleBucketIndexData, indexWriter); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + final IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + + // Force agg to update rounding when it begins collecting from the second segment. + final AutoDateHistogramAggregationBuilder aggregationBuilder = new AutoDateHistogramAggregationBuilder("_name"); + aggregationBuilder.setNumBuckets(3).field(DATE_FIELD).timeZone(ZoneId.of("America/New_York")); + + Map expectedDocCount = new TreeMap<>(); + expectedDocCount.put("2023-10-01T00:00:00.000-04:00", 1); + expectedDocCount.put("2023-11-01T00:00:00.000-04:00", 1); + expectedDocCount.put("2023-12-01T00:00:00.000-05:00", 1); + + final InternalAutoDateHistogram histogram = searchAndReduce( + indexSearcher, + DEFAULT_QUERY, + aggregationBuilder, + false, + new DateFieldMapper.DateFieldType(aggregationBuilder.field()), + new NumberFieldMapper.NumberFieldType(INSTANT_FIELD, NumberFieldMapper.NumberType.LONG), + new NumberFieldMapper.NumberFieldType(NUMERIC_FIELD, NumberFieldMapper.NumberType.LONG) + ); + + assertThat(bucketCountsAsMap(histogram), equalTo(expectedDocCount)); + } + } + } + @Override protected IndexSettings createIndexSettings() { final Settings nodeSettings = Settings.builder().put("search.max_buckets", 25000).build(); diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 24cc376e620c8..5d322637dc645 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -607,9 +607,19 @@ protected A searchAndReduc IndexSearcher searcher, Query query, AggregationBuilder builder, + boolean shardFanOut, MappedFieldType... fieldTypes ) throws IOException { - return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, shardFanOut, fieldTypes); + } + + protected A searchAndReduce( + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + MappedFieldType... fieldTypes + ) throws IOException { + return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, randomBoolean(), fieldTypes); } protected A searchAndReduce( @@ -619,7 +629,7 @@ protected A searchAndReduc AggregationBuilder builder, MappedFieldType... fieldTypes ) throws IOException { - return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, randomBoolean(), fieldTypes); } protected A searchAndReduce( @@ -629,7 +639,7 @@ protected A searchAndReduc int maxBucket, MappedFieldType... fieldTypes ) throws IOException { - return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes); + return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, randomBoolean(), fieldTypes); } protected A searchAndReduce( @@ -638,9 +648,10 @@ protected A searchAndReduc Query query, AggregationBuilder builder, int maxBucket, + boolean shardFanOut, MappedFieldType... fieldTypes ) throws IOException { - return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, fieldTypes); + return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, shardFanOut, fieldTypes); } /** @@ -658,6 +669,7 @@ protected A searchAndReduc AggregationBuilder builder, int maxBucket, boolean hasNested, + boolean shardFanOut, MappedFieldType... fieldTypes ) throws IOException { final IndexReaderContext ctx = searcher.getTopReaderContext(); @@ -673,7 +685,7 @@ protected A searchAndReduc ); C root = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); - if (randomBoolean() && searcher.getIndexReader().leaves().size() > 0) { + if (shardFanOut && searcher.getIndexReader().leaves().size() > 0) { assertThat(ctx, instanceOf(CompositeReaderContext.class)); final CompositeReaderContext compCTX = (CompositeReaderContext) ctx; final int size = compCTX.leaves().size();