Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] add support for terms agg in transforms #56696

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,8 @@ are supported:
* <<search-aggregations-metrics-weight-avg-aggregation,Weighted average>>
* <<search-aggregations-metrics-cardinality-aggregation,Cardinality>>
* <<search-aggregations-bucket-filter-aggregation,Filter>>
* <<search-aggregations-bucket-rare-terms-aggregation, Rare Terms>>
* <<search-aggregations-bucket-terms-aggregation, Terms>>
* <<search-aggregations-metrics-geobounds-aggregation,Geo bounds>>
* <<search-aggregations-metrics-geocentroid-aggregation,Geo centroid>>
* <<search-aggregations-metrics-max-aggregation,Max>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,12 @@ setup:
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
"aggs": {
"vals": {"terms": {"field":"airline"}}
"vals": {"significant_terms": {"field":"airline"}}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

}
}
}
- do:
catch: /Unsupported aggregation type \[terms\]/
catch: /Unsupported aggregation type \[significant_terms\]/
transform.preview_transform:
body: >
{
Expand All @@ -280,7 +280,7 @@ setup:
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
"aggs": {
"vals": {"terms": {"field":"airline"}}
"vals": {"significant_terms": {"field":"airline"}}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -508,6 +510,97 @@ public void testDateHistogramPivotNanos() throws Exception {
assertDateHistogramPivot(REVIEWS_DATE_NANO_INDEX_NAME);
}

@SuppressWarnings("unchecked")
public void testPivotWithTermsAgg() throws Exception {
String transformId = "simple_terms_agg_pivot";
String transformIndex = "pivot_reviews_via_histogram_with_terms_agg";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformIndex);

final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
);

String config = "{"
+ " \"source\": {\"index\":\""
+ REVIEWS_INDEX_NAME
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},";

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"every_2\": {"
+ " \"histogram\": {"
+ " \"interval\": 2,\"field\":\"stars\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"common_users\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\","
+ " \"size\": 2"
+ " },"
+ " \"aggs\" : {"
+ " \"common_businesses\": {"
+ " \"terms\": {"
+ " \"field\": \"business_id\","
+ " \"size\": 2"
+ " }}"
+ " } "
+" },"
+ " \"rare_users\": {"
+ " \"rare_terms\": {"
+ " \"field\": \"user_id\""
+ " } } } }"
+ "}";

createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));

// we expect 3 documents as there shall be 5 unique star values and we are bucketing every 2 starting at 0
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));

// get and check some term results
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
Map<String, Integer> commonUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.common_users",
searchResult
)).get(0);
Map<String, Integer> rareUsers = (Map<String, Integer>) ((List<?>) XContentMapValues.extractValue(
"hits.hits._source.rare_users",
searchResult
)).get(0);
assertThat(commonUsers, is(not(nullValue())));
assertThat(commonUsers, equalTo(new HashMap<>(){{
put("user_10",
Collections.singletonMap(
"common_businesses",
new HashMap<>(){{
put("business_12", 6);
put("business_9", 4);
}}));
put("user_0", Collections.singletonMap(
"common_businesses",
new HashMap<>(){{
put("business_0", 35);
}}));
}}));
assertThat(rareUsers, is(not(nullValue())));
assertThat(rareUsers, equalTo(new HashMap<>(){{
put("user_5", 1);
put("user_12", 1);
}}));
}

private void assertDateHistogramPivot(String indexName) throws Exception {
String transformId = "simple_date_histogram_pivot_" + indexName;
String transformIndex = "pivot_reviews_via_date_histogram_" + indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.metrics.GeoBounds;
Expand Down Expand Up @@ -52,6 +53,7 @@ public final class AggregationResultUtils {
tempMap.put(GeoBounds.class.getName(), new GeoBoundsAggExtractor());
tempMap.put(Percentiles.class.getName(), new PercentilesAggExtractor());
tempMap.put(SingleBucketAggregation.class.getName(), new SingleBucketAggExtractor());
tempMap.put(MultiBucketsAggregation.class.getName(), new MultiBucketsAggExtractor());
TYPE_VALUE_EXTRACTOR_MAP = Collections.unmodifiableMap(tempMap);
}

Expand Down Expand Up @@ -120,6 +122,8 @@ static AggValueExtractor getExtractor(Aggregation aggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(Percentiles.class.getName());
} else if (aggregation instanceof SingleBucketAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(SingleBucketAggregation.class.getName());
} else if (aggregation instanceof MultiBucketsAggregation) {
return TYPE_VALUE_EXTRACTOR_MAP.get(MultiBucketsAggregation.class.getName());
} else {
// Execution should never reach this point!
// Creating transforms with unsupported aggregations shall not be possible
Expand Down Expand Up @@ -246,6 +250,35 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
}
}

static class MultiBucketsAggExtractor implements AggValueExtractor {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is almost exactly the way I implemented it, too (well, there is probably no other way).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great minds think alike.

@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
MultiBucketsAggregation aggregation = (MultiBucketsAggregation) agg;

HashMap<String, Object> nested = new HashMap<>();

for (MultiBucketsAggregation.Bucket bucket : aggregation.getBuckets()) {
if (bucket.getAggregations().iterator().hasNext() == false) {
nested.put(bucket.getKeyAsString(), bucket.getDocCount());
} else {
HashMap<String, Object> nestedBucketObject = new HashMap<>();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to cover this branch and have a test with nested terms aggs, like your common user example, broke down by e.g. businesses or filtered by something

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will do.

for (Aggregation subAgg : bucket.getAggregations()) {
nestedBucketObject.put(
subAgg.getName(),
getExtractor(subAgg).value(
subAgg,
fieldTypeMap,
lookupFieldPrefix.isEmpty() ? agg.getName() : lookupFieldPrefix + "." + agg.getName()
)
);
}
nested.put(bucket.getKeyAsString(), nestedBucketObject);
}
}
return nested;
}
}

static class ScriptedMetricAggExtractor implements AggValueExtractor {
@Override
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public final class Aggregations {
private static final String SOURCE = "_source";

public static final String FLOAT = "float";
public static final String FLATTENED = "flattened";
public static final String SCALED_FLOAT = "scaled_float";
public static final String DOUBLE = "double";
public static final String LONG = "long";
Expand Down Expand Up @@ -69,14 +70,12 @@ public final class Aggregations {
"nested",
"percentile_ranks",
"range",
"rare_terms",
"reverse_nested",
"sampler",
"significant_terms", // https://github.com/elastic/elasticsearch/issues/51073
"significant_text",
"stats", // https://github.com/elastic/elasticsearch/issues/51925
"string_stats", // https://github.com/elastic/elasticsearch/issues/51925
"terms", // https://github.com/elastic/elasticsearch/issues/51073
"top_hits",
"top_metrics", // https://github.com/elastic/elasticsearch/issues/52236
"t_test" // https://github.com/elastic/elasticsearch/issues/54503
Expand Down Expand Up @@ -107,7 +106,9 @@ enum AggregationType {
BUCKET_SELECTOR("bucket_selector", DYNAMIC),
BUCKET_SCRIPT("bucket_script", DYNAMIC),
PERCENTILES("percentiles", DOUBLE),
FILTER("filter", LONG);
FILTER("filter", LONG),
TERMS("terms", FLATTENED),
RARE_TERMS("rare_terms", FLATTENED);

private final String aggregationType;
private final String targetMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ public void testResolveTargetMapping() {
assertEquals("long", Aggregations.resolveTargetMapping("filter", "long"));
assertEquals("long", Aggregations.resolveTargetMapping("filter", "double"));

// terms
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", null));
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "keyword"));
assertEquals("flattened", Aggregations.resolveTargetMapping("terms", "text"));

// rare_terms
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", null));
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "text"));
assertEquals("flattened", Aggregations.resolveTargetMapping("rare_terms", "keyword"));

// corner case: source type null
assertEquals(null, Aggregations.resolveTargetMapping("min", null));
}
Expand Down