Skip to content

Commit

Permalink
Update bucket metric pipeline agg paths to allow intermediate single …
Browse files Browse the repository at this point in the history
…bucket and bucket qualified multi-bucket aggs (elastic#85729)

Bucket metric pipeline aggregation paths currently always expect the sibling aggregation to be a multi-bucket agg. 

This honestly doesn't have to be the case for bucket metric pipeline aggs to work. 

Consider the following path:

```
filter_agg>filters_agg['bucket_foo']>histo>some_metric_agg
```

Since `filter_agg>filters_agg['bucket_foo']` are well defined and are not crossing bucket threasholds, metrics should still be able to be calculated against the bucket values for `histo`

This commit allows any combination of single bucket aggs (e.g. filter) and bucket specific multi-bucket aggs before reaching the desired multi-bucket used for the metric calculation.
  • Loading branch information
benwtrent authored May 25, 2022
1 parent 7d86f14 commit b4668ca
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 35 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/85729.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 85729
summary: Update bucket metric pipeline agg paths to allow intermediate single bucket
and bucket qualified multi-bucket aggs
area: Aggregations
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -105,20 +106,45 @@ protected void validate(ValidationContext context) {
return;
}
// find the first agg name in the buckets path to check its a multi bucket agg
final String firstAgg = AggregationPath.parse(bucketsPaths[0]).getPathElementsAsStringList().get(0);
List<AggregationPath.PathElement> path = AggregationPath.parse(bucketsPaths[0]).getPathElements();
int pathPos = 0;
AggregationPath.PathElement currentAgg = path.get(pathPos++);
final String aggName = currentAgg.name();
Optional<AggregationBuilder> aggBuilder = context.getSiblingAggregations()
.stream()
.filter(builder -> builder.getName().equals(firstAgg))
.filter(builder -> builder.getName().equals(aggName))
.findAny();
if (aggBuilder.isEmpty()) {
context.addBucketPathValidationError("aggregation does not exist for aggregation [" + name + "]: " + bucketsPaths[0]);
return;
}

// Dig through the aggregation tree to find the first aggregation specified by the path.
// The path may have many single bucket aggs (with sub-aggs) or many multi-bucket aggs specified by bucket keys
while (aggBuilder.isPresent()
&& pathPos < path.size()
&& ((aggBuilder.get().bucketCardinality() == AggregationBuilder.BucketCardinality.MANY
&& AggregationPath.pathElementContainsBucketKey(currentAgg))
|| (aggBuilder.get().bucketCardinality() == AggregationBuilder.BucketCardinality.ONE
&& aggBuilder.get().getSubAggregations().isEmpty() == false))) {
currentAgg = path.get(pathPos++);
final String subAggName = currentAgg.name();
aggBuilder = aggBuilder.get().getSubAggregations().stream().filter(b -> b.getName().equals(subAggName)).findAny();
}
if (aggBuilder.isEmpty()) {
context.addBucketPathValidationError(
"aggregation does not exist for aggregation ["
+ name
+ "]: "
+ AggregationPath.pathElementsAsStringList(path.subList(0, pathPos))
);
return;
}
if (aggBuilder.get().bucketCardinality() != AggregationBuilder.BucketCardinality.MANY) {
context.addValidationError(
"The first aggregation in "
"Unable to find unqualified multi-bucket aggregation in "
+ PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must be a multi-bucket aggregation for aggregation ["
+ ". Path must include a multi-bucket aggregation for aggregation ["
+ name
+ "] found :"
+ aggBuilder.get().getClass().getName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@

import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.support.AggregationPath;

import java.util.List;
import java.util.Map;

import static org.elasticsearch.search.aggregations.support.AggregationPath.pathElementContainsBucketKey;

/**
* A class of sibling pipeline aggregations which calculate metrics across the
* buckets of a sibling aggregation
Expand All @@ -44,11 +48,74 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
@Override
public final InternalAggregation doReduce(Aggregations aggregations, AggregationReduceContext context) {
preCollection();
List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
List<AggregationPath.PathElement> parsedPath = AggregationPath.parse(bucketsPaths()[0]).getPathElements();
for (Aggregation aggregation : aggregations) {
if (aggregation.getName().equals(bucketsPath.get(0))) {
List<String> sublistedPath = bucketsPath.subList(1, bucketsPath.size());
InternalMultiBucketAggregation<?, ?> multiBucketsAgg = (InternalMultiBucketAggregation<?, ?>) aggregation;
// Now that we have found the first agg in the path, resolve to the first non-qualified multi-bucket path
if (aggregation.getName().equals(parsedPath.get(0).name())) {
int currElement = 0;
Aggregation currentAgg = aggregation;
while (currElement < parsedPath.size() - 1) {
if (currentAgg == null) {
throw new IllegalArgumentException(
"bucket_path ["
+ bucketsPaths()[0]
+ "] expected aggregation with name ["
+ parsedPath.get(currElement).name()
+ "] but was missing in search response"
);
}
if (currentAgg instanceof InternalSingleBucketAggregation singleBucketAggregation) {
currentAgg = singleBucketAggregation.getAggregations().get(parsedPath.get(++currElement).name());
} else if (pathElementContainsBucketKey(parsedPath.get(currElement))) {
if (currentAgg instanceof InternalMultiBucketAggregation<?, ?> multiBucketAggregation) {
InternalMultiBucketAggregation.InternalBucket bucket =
(InternalMultiBucketAggregation.InternalBucket) multiBucketAggregation.getProperty(
parsedPath.get(currElement).key()
);
if (bucket == null) {
throw new AggregationExecutionException(
"missing bucket ["
+ parsedPath.get(currElement).key()
+ "] for agg ["
+ currentAgg.getName()
+ "] while extracting bucket path ["
+ bucketsPaths()[0]
+ "]"
);
}
if (currElement == parsedPath.size() - 1) {
throw new AggregationExecutionException(
"invalid bucket path ends at [" + parsedPath.get(currElement).key() + "]"
);
}
currentAgg = bucket.getAggregations().get(parsedPath.get(++currElement).name());
} else {
throw new AggregationExecutionException(
"bucket_path ["
+ bucketsPaths()[0]
+ "] indicates bucket_key ["
+ parsedPath.get(currElement).key()
+ "] at position ["
+ currElement
+ "] but encountered on agg ["
+ currentAgg.getName()
+ "] which is not a multi_bucket aggregation"
);
}
} else {
break;
}
}
if (currentAgg instanceof InternalMultiBucketAggregation == false) {
String msg = currentAgg == null
? "did not find multi-bucket aggregation for extraction."
: "did not find multi-bucket aggregation for extraction. Found [" + currentAgg.getName() + "]";
throw new AggregationExecutionException(msg);
}
List<String> sublistedPath = AggregationPath.pathElementsAsStringList(parsedPath.subList(currElement, parsedPath.size()));
// First element is the current agg, so we want the rest of the path
sublistedPath = sublistedPath.subList(1, sublistedPath.size());
InternalMultiBucketAggregation<?, ?> multiBucketsAgg = (InternalMultiBucketAggregation<?, ?>) currentAgg;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = multiBucketsAgg.getBuckets();
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, sublistedPath, gapPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ public class AggregationPath {

private static final String AGG_DELIM = ">";

/**
* Indicates if the current path element contains a bucket key.
*
* InternalMultiBucketAggregation#resolvePropertyFromPath supports resolving specific buckets and a bucket is indicated by
* wrapping a key element in quotations. Example `agg['foo']` would get the bucket `foo` in the agg.
*
* @param pathElement The path element to check
* @return Does the path element contain a bucket_key or not
*/
public static boolean pathElementContainsBucketKey(AggregationPath.PathElement pathElement) {
return pathElement != null && pathElement.key() != null && pathElement.key().startsWith("'") && pathElement.key().endsWith("'");
}

public static List<String> pathElementsAsStringList(List<PathElement> pathElements) {
List<String> stringPathElements = new ArrayList<>();
for (PathElement pathElement : pathElements) {
stringPathElements.add(pathElement.name);
if (pathElement.key != null) {
stringPathElements.add(pathElement.key);
}
if (pathElement.metric != null) {
stringPathElements.add(pathElement.metric);
}
}
return stringPathElements;
}

public static AggregationPath parse(String path) {
String[] elements = Strings.tokenizeToStringArray(path, AGG_DELIM);
List<PathElement> tokens = new ArrayList<>(elements.length);
Expand Down Expand Up @@ -181,17 +208,7 @@ public List<PathElement> getPathElements() {
}

public List<String> getPathElementsAsStringList() {
List<String> stringPathElements = new ArrayList<>();
for (PathElement pathElement : this.pathElements) {
stringPathElements.add(pathElement.name);
if (pathElement.key != null) {
stringPathElements.add(pathElement.key);
}
if (pathElement.metric != null) {
stringPathElements.add(pathElement.metric);
}
}
return stringPathElements;
return pathElementsAsStringList(this.pathElements);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,32 @@

import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalAvg;

Expand Down Expand Up @@ -119,6 +127,95 @@ public void testSameAggNames() throws IOException {
}
}

public void testComplicatedBucketPath() throws IOException {
Query query = new MatchAllDocsQuery();
final String textField = "text";
AvgAggregationBuilder avgBuilder = new AvgAggregationBuilder("foo").field(VALUE_FIELD);
DateHistogramAggregationBuilder histo = new DateHistogramAggregationBuilder("histo").calendarInterval(DateHistogramInterval.YEAR)
.field(DATE_FIELD)
.subAggregation(new AvgAggregationBuilder("foo").field(VALUE_FIELD));
TermsAggregationBuilder termsBuilder = new TermsAggregationBuilder("terms").field(textField).subAggregation(histo);
FilterAggregationBuilder filterAggregationBuilder = new FilterAggregationBuilder("filter", QueryBuilders.matchAllQuery())
.subAggregation(termsBuilder);
AvgBucketPipelineAggregationBuilder avgBucketBuilder = new AvgBucketPipelineAggregationBuilder(
"the_avg_bucket",
"filter>terms['value']>histo>foo"
);

try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
for (String date : dataset) {
if (frequently()) {
indexWriter.commit();
}

document.add(new SortedNumericDocValuesField(DATE_FIELD, asLong(date)));
document.add(new SortedNumericDocValuesField(VALUE_FIELD, randomInt()));
document.add(new SortedSetDocValuesField(textField, new BytesRef("value")));
document.add(
new KeywordFieldMapper.KeywordField(textField, new BytesRef("value"), KeywordFieldMapper.Defaults.FIELD_TYPE)
);
indexWriter.addDocument(document);
document.clear();
}
}

InternalAvg avgResult;
InternalDateHistogram histogramResult;
InternalFilter filterResult;
InternalTerms<?, ?> internalTerms;
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);

DateFieldMapper.DateFieldType fieldType = new DateFieldMapper.DateFieldType(DATE_FIELD);
MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD, NumberFieldMapper.NumberType.LONG);
MappedFieldType keywordField = keywordField(textField);

avgResult = searchAndReduce(
indexSearcher,
query,
avgBuilder,
10000,
new MappedFieldType[] { fieldType, valueFieldType, keywordField }
);
histogramResult = searchAndReduce(
indexSearcher,
query,
histo,
10000,
new MappedFieldType[] { fieldType, valueFieldType, keywordField }
);
internalTerms = searchAndReduce(
indexSearcher,
query,
termsBuilder,
10000,
new MappedFieldType[] { fieldType, valueFieldType, keywordField }
);
filterResult = searchAndReduce(
indexSearcher,
query,
filterAggregationBuilder,
10000,
new MappedFieldType[] { fieldType, valueFieldType, keywordField }
);
}

// Finally, reduce the pipeline agg
PipelineAggregator avgBucketAgg = avgBucketBuilder.createInternal(Collections.emptyMap());
List<Aggregation> reducedAggs = new ArrayList<>(4);

reducedAggs.add(filterResult);
reducedAggs.add(internalTerms);
reducedAggs.add(histogramResult);
reducedAggs.add(avgResult);
Aggregations aggregations = new Aggregations(reducedAggs);
InternalAggregation pipelineResult = ((AvgBucketPipelineAggregator) avgBucketAgg).doReduce(aggregations, null);
assertNotNull(pipelineResult);
}
}

private static long asLong(String dateTime) {
return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public void testValidate() {
assertThat(
validate(aggBuilders, new AvgBucketPipelineAggregationBuilder("name", "global>metric")),
equalTo(
"Validation Failed: 1: The first aggregation in "
"Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
+ PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must be a multi-bucket aggregation for aggregation [name] found :"
+ ". Path must include a multi-bucket aggregation for aggregation [name] found :"
+ GlobalAggregationBuilder.class.getName()
+ " for buckets path: global>metric;"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public void testValidate() {
assertThat(
validate(aggBuilders, new ExtendedStatsBucketPipelineAggregationBuilder("name", "global>metric")),
equalTo(
"Validation Failed: 1: The first aggregation in "
"Validation Failed: 1: Unable to find unqualified multi-bucket aggregation in "
+ PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must be a multi-bucket aggregation for aggregation [name] found :"
+ ". Path must include a multi-bucket aggregation for aggregation [name] found :"
+ GlobalAggregationBuilder.class.getName()
+ " for buckets path: global>metric;"
)
Expand Down
Loading

0 comments on commit b4668ca

Please sign in to comment.