Skip to content

Commit

Permalink
Fix scripted metric BWC serialization (elastic#63821)
Browse files Browse the repository at this point in the history
We had and an error when serializing fully reduced scripted metrics.
Small typo and sever lack of tests..... Anyway, this fixed the one
character typo and adds a bunch more tests.
  • Loading branch information
nik9000 authored Oct 19, 2020
1 parent 6ab73cd commit dce5a65
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
final Script reduceScript;
private final List<Object> aggregations;

InternalScriptedMetric(String name, Object aggregation, Script reduceScript, Map<String, Object> metadata) {
this(name, Collections.singletonList(aggregation), reduceScript, metadata);
}


private InternalScriptedMetric(String name, List<Object> aggregations, Script reduceScript, Map<String, Object> metadata) {
InternalScriptedMetric(String name, List<Object> aggregations, Script reduceScript, Map<String, Object> metadata) {
super(name, metadata);
this.aggregations = aggregations;
this.reduceScript = reduceScript;
Expand All @@ -70,11 +65,12 @@ public InternalScriptedMetric(StreamInput in) throws IOException {
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(reduceScript);
if (out.getVersion().before(Version.V_7_8_0)) {
if (aggregations.size() > 0) {
if (aggregations.size() > 1) {
/*
* I *believe* that this situation can only happen in cross
* cluster search right now. Thus the message. But computers
* are hard.
* If aggregations has more than one entry we're trying to
* serialize an unreduced aggregation. This *should* only
* happen when we're returning a scripted_metric over cross
* cluster search.
*/
throw new IllegalArgumentException("scripted_metric doesn't support cross cluster search until 7.8.0");
}
Expand All @@ -97,7 +93,7 @@ public Object aggregation() {
return aggregations.get(0);
}

List<Object> getAggregation() {
List<Object> aggregationsList() {
return aggregations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static boolean hasValue(InternalMedianAbsoluteDeviation agg) {
public static boolean hasValue(InternalScriptedMetric agg) {
// TODO better way to know if the scripted metric received documents?
// Could check for null too, but a script might return null on purpose...
return agg.getAggregation().size() > 0 ;
return agg.aggregationsList().size() > 0 ;
}

public static boolean hasValue(InternalTDigestPercentileRanks agg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonList;

class ScriptedMetricAggregator extends MetricsAggregator {
/**
* Estimated cost to maintain a bucket. Since this aggregator uses
Expand Down Expand Up @@ -139,7 +141,7 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
Object result = aggStateForResult(owningBucketOrdinal).combine();
StreamOutput.checkWriteable(result);
return new InternalScriptedMetric(name, result, reduceScript, metadata());
return new InternalScriptedMetric(name, singletonList(result), reduceScript, metadata());
}

private State aggStateForResult(long owningBucketOrdinal) {
Expand All @@ -157,7 +159,7 @@ private State aggStateForResult(long owningBucketOrdinal) {

@Override
public InternalAggregation buildEmptyAggregation() {
return new InternalScriptedMetric(name, null, reduceScript, metadata());
return new InternalScriptedMetric(name, singletonList(null), reduceScript, metadata());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.metrics;

import org.elasticsearch.Version;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.MockScriptEngine;
Expand All @@ -28,8 +29,12 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.Aggregation.CommonFields;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.VersionUtils;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -41,6 +46,9 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;

public class InternalScriptedMetricTests extends InternalAggregationTestCase<InternalScriptedMetric> {

private static final String REDUCE_SCRIPT_NAME = "reduceScript";
Expand Down Expand Up @@ -79,8 +87,27 @@ protected InternalScriptedMetric createTestInstance(String name, Map<String, Obj
if (hasReduceScript) {
reduceScript = new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME, params);
}
Object randomValue = randomValue(valueTypes, 0);
return new InternalScriptedMetric(name, randomValue, reduceScript, metadata);
return new InternalScriptedMetric(name, randomAggregations(), reduceScript, metadata);
}

private List<Object> randomAggregations() {
return randomList(randomBoolean() ? 1 : 5, this::randomAggregation);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private Object randomAggregation() {
int levels = randomIntBetween(1, 3);
Supplier[] valueTypes = new Supplier[levels];
for (int l = 0; l < levels; l++) {
if (l < levels - 1) {
valueTypes[l] = randomFrom(nestedValueSuppliers);
} else {
// the last one needs to be a leaf value, not map or
// list
valueTypes[l] = randomFrom(leafValueSuppliers);
}
}
return randomValue(valueTypes, 0);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -124,13 +151,23 @@ protected void assertReduced(InternalScriptedMetric reduced, List<InternalScript
InternalScriptedMetric firstAgg = inputs.get(0);
assertEquals(firstAgg.getName(), reduced.getName());
assertEquals(firstAgg.getMetadata(), reduced.getMetadata());
int size = (int) inputs.stream().mapToLong(i -> i.aggregationsList().size()).sum();
if (hasReduceScript) {
assertEquals(inputs.size(), reduced.aggregation());
assertEquals(size, reduced.aggregation());
} else {
assertEquals(inputs.size(), ((List<?>) reduced.aggregation()).size());
assertEquals(size, ((List<?>) reduced.aggregation()).size());
}
}

@Override
protected InternalScriptedMetric createTestInstanceForXContent() {
InternalScriptedMetric aggregation = createTestInstance();
return (InternalScriptedMetric) aggregation.reduce(
singletonList(aggregation),
ReduceContext.forFinalReduction(null, mockScriptService(), null, PipelineTree.EMPTY)
);
}

@Override
protected void assertFromXContent(InternalScriptedMetric aggregation, ParsedAggregation parsedAggregation) {
assertTrue(parsedAggregation instanceof ParsedScriptedMetric);
Expand Down Expand Up @@ -188,30 +225,15 @@ protected Predicate<String> excludePathsFromXContentInsertion() {
@Override
protected InternalScriptedMetric mutateInstance(InternalScriptedMetric instance) throws IOException {
String name = instance.getName();
Object value = instance.aggregation();
List<Object> aggregationsList = instance.aggregationsList();
Script reduceScript = instance.reduceScript;
Map<String, Object> metadata = instance.getMetadata();
switch (between(0, 3)) {
case 0:
name += randomAlphaOfLength(5);
break;
case 1:
Object newValue = randomValue(valueTypes, 0);
while ((newValue == null && value == null) || (newValue != null && newValue.equals(value))) {
int levels = randomIntBetween(1, 3);
Supplier[] valueTypes = new Supplier[levels];
for (int i = 0; i < levels; i++) {
if (i < levels - 1) {
valueTypes[i] = randomFrom(nestedValueSuppliers);
} else {
// the last one needs to be a leaf value, not map or
// list
valueTypes[i] = randomFrom(leafValueSuppliers);
}
}
newValue = randomValue(valueTypes, 0);
}
value = newValue;
aggregationsList = randomValueOtherThan(aggregationsList, this::randomAggregations);
break;
case 2:
reduceScript = new Script(ScriptType.INLINE, MockScriptEngine.NAME, REDUCE_SCRIPT_NAME + "-mutated", Collections.emptyMap());
Expand All @@ -227,6 +249,28 @@ protected InternalScriptedMetric mutateInstance(InternalScriptedMetric instance)
default:
throw new AssertionError("Illegal randomisation branch");
}
return new InternalScriptedMetric(name, value, reduceScript, metadata);
return new InternalScriptedMetric(name, aggregationsList, reduceScript, metadata);
}

public void testOldSerialization() throws IOException {
// A single element list looks like a fully reduced agg
InternalScriptedMetric original = new InternalScriptedMetric("test", List.of("foo"), new Script("test"), null);
InternalScriptedMetric roundTripped = (InternalScriptedMetric) copyNamedWriteable(
original,
getNamedWriteableRegistry(),
InternalAggregation.class,
VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, VersionUtils.getPreviousVersion(Version.V_7_8_0))
);
assertThat(roundTripped, equalTo(original));

// A multi-element list looks like a non-reduced agg
InternalScriptedMetric unreduced = new InternalScriptedMetric("test", List.of("foo", "bar"), new Script("test"), null);
Exception e = expectThrows(IllegalArgumentException.class, () -> copyNamedWriteable(
unreduced,
getNamedWriteableRegistry(),
InternalAggregation.class,
VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, VersionUtils.getPreviousVersion(Version.V_7_8_0))
));
assertThat(e.getMessage(), equalTo("scripted_metric doesn't support cross cluster search until 7.8.0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,14 +436,18 @@ protected final T createUnmappedInstance(String name) {
return createUnmappedInstance(name, metadata);
}

protected T createTestInstanceForXContent() {
return createTestInstance();
}

public final void testFromXContent() throws IOException {
final T aggregation = createTestInstance();
final T aggregation = createTestInstanceForXContent();
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), false);
assertFromXContent(aggregation, parsedAggregation);
}

public final void testFromXContentWithRandomFields() throws IOException {
final T aggregation = createTestInstance();
final T aggregation = createTestInstanceForXContent();
final ParsedAggregation parsedAggregation = parseAndAssert(aggregation, randomBoolean(), true);
assertFromXContent(aggregation, parsedAggregation);
}
Expand Down

0 comments on commit dce5a65

Please sign in to comment.