Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ignore_malformed handling for synthetic souce in aggregate_metric_double #109888

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -748,24 +748,4 @@ public static XContentParser mapToXContentParser(XContentParserConfiguration con
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
}
}

/**
* Drains all data available via this parser into a provided builder.
* Provided parser is closed as a result.
* @param parser
* @param destination
*/
public static void drainAndClose(XContentParser parser, XContentBuilder destination) throws IOException {
if (parser.isClosed()) {
throw new IllegalStateException("Can't drain a parser that is closed");
}

XContentParser.Token token;
do {
destination.copyCurrentStructure(parser);
token = parser.nextToken();
} while (token != null);

parser.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.common.xcontent.support;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -421,25 +420,4 @@ public void testParseToType() throws IOException {

assertThat(names, equalTo(Set.of("a", "c")));
}

public void testDrainAndClose() throws IOException {
String json = """
{ "a": "b", "c": "d", "e": {"f": "g"}, "h": ["i", "j", {"k": "l"}]}""";
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json);
var content = XContentBuilder.builder(XContentType.JSON.xContent());
XContentHelper.drainAndClose(parser, content);

assertEquals(json.replace(" ", ""), Strings.toString(content));
assertTrue(parser.isClosed());
}

public void testDrainAndCloseAlreadyClosed() throws IOException {
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, "{}");
parser.close();

assertThrows(
IllegalStateException.class,
() -> XContentHelper.drainAndClose(parser, XContentBuilder.builder(XContentType.JSON.xContent()))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateMathParser;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.fielddata.FieldDataContext;
Expand All @@ -28,9 +27,10 @@
import org.elasticsearch.index.fielddata.ScriptDocValues.DoublesSupplier;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperBuilderContext;
Expand All @@ -43,7 +43,6 @@
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType;
import org.elasticsearch.index.mapper.ValueFetcher;
import org.elasticsearch.index.mapper.XContentDataHelper;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.script.ScriptCompiler;
Expand All @@ -53,6 +52,7 @@
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.CopyingXContentParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentSubParser;
Expand Down Expand Up @@ -592,9 +592,7 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
EnumMap<Metric, Number> metricsParsed = new EnumMap<>(Metric.class);
// Preserves the content of the field in order to be able to construct synthetic source
// if field value is malformed.
XContentBuilder malformedContentForSyntheticSource = context.mappingLookup().isSourceSynthetic() && ignoreMalformed
? XContentBuilder.builder(context.parser().contentType().xContent())
: null;
XContentBuilder malformedDataForSyntheticSource = null;

try {
token = context.parser().currentToken();
Expand All @@ -603,11 +601,14 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
return;
}
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser());
subParser = new XContentSubParser(context.parser());
token = subParser.nextToken();
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.startObject();
if (context.mappingLookup().isSourceSynthetic() && ignoreMalformed) {
var copyingParser = new CopyingXContentParser(context.parser());
malformedDataForSyntheticSource = copyingParser.getBuilder();
subParser = new XContentSubParser(copyingParser);
} else {
subParser = new XContentSubParser(context.parser());
}
token = subParser.nextToken();
while (token != XContentParser.Token.END_OBJECT) {
// should be an object sub-field with name a metric name
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser);
Expand All @@ -621,9 +622,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
}

token = subParser.nextToken();
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.field(fieldName);
}
// Make sure that the value is a number. Probably this will change when
// new aggregate metric types are added (histogram, cardinality etc)
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser);
Expand All @@ -632,9 +630,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
try {
Number metricValue = delegateFieldMapper.value(context.parser());
metricsParsed.put(metric, metricValue);
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.value(metricValue);
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("failed to parse [" + metric.name() + "] sub field: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -677,24 +672,20 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
}
} catch (Exception e) {
if (ignoreMalformed) {
if (malformedContentForSyntheticSource != null) {
if (subParser != null) {
// Remaining data in parser needs to be stored as is in order to provide it in synthetic source.
XContentHelper.drainAndClose(subParser, malformedContentForSyntheticSource);
} else {
// We don't use DrainingXContentParser since we don't want to go beyond current field
malformedContentForSyntheticSource.copyCurrentStructure(context.parser());
}
;
var nameValue = IgnoredSourceFieldMapper.NameValue.fromContext(
context,
name(),
XContentDataHelper.encodeXContentBuilder(malformedContentForSyntheticSource)
);
context.addIgnoredField(nameValue);
} else if (subParser != null) {
if (subParser != null) {
// close the subParser, so we advance to the end of the object
subParser.close();
} else {
if (context.mappingLookup().isSourceSynthetic()) {
// There is a malformed value, but it is not an object (since subParser is null).
// So we just need to copy this single value.
malformedDataForSyntheticSource = XContentBuilder.builder(context.parser().contentType().xContent())
.copyCurrentStructure(context.parser());
}
}

if (malformedDataForSyntheticSource != null) {
context.doc().add(IgnoreMalformedStoredValues.storedField(name(), malformedDataForSyntheticSource));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blocks gets repeated, consider moving it to a helper in IgnoreMalformedStoredValues

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it if you don't mind i don't think it's too bad of a repetition.


context.addIgnoredField(name());
Expand Down Expand Up @@ -724,11 +715,15 @@ protected SyntheticSourceMode syntheticSourceMode() {

@Override
public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
// Note that malformed values are handled via `IgnoredSourceFieldMapper` infrastructure
return new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics);
return new CompositeSyntheticFieldLoader(
simpleName(),
name(),
new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics),
new CompositeSyntheticFieldLoader.MalformedValuesLayer(name())
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cute :)

}

public static class AggregateMetricSyntheticFieldLoader implements SourceLoader.SyntheticFieldLoader {
public static class AggregateMetricSyntheticFieldLoader implements CompositeSyntheticFieldLoader.SyntheticFieldLoaderLayer {
private final String name;
private final String simpleName;
private final EnumSet<Metric> metrics;
Expand All @@ -746,6 +741,11 @@ public String fieldName() {
return name;
}

@Override
public long valueCount() {
return hasValue() ? 1 : 0;
}

@Override
public Stream<Map.Entry<String, StoredFieldLoader>> storedFieldLoaders() {
return Stream.of();
Expand Down Expand Up @@ -779,7 +779,7 @@ public void write(XContentBuilder b) throws IOException {
if (metricHasValue.isEmpty()) {
return;
}
b.startObject(simpleName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question, why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #109882.

b.startObject();
for (Map.Entry<Metric, SortedNumericDocValues> entry : metricDocValues.entrySet()) {
if (metricHasValue.contains(entry.getKey())) {
String metricName = entry.getKey().name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import org.apache.lucene.search.FieldExistsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentParsingException;
import org.elasticsearch.index.mapper.LuceneDocument;
Expand All @@ -20,6 +22,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Metric;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -523,6 +526,43 @@ protected IngestScriptSupport ingestScriptSupport() {
throw new AssumptionViolatedException("not supported");
}

public void testArrayValueSyntheticSource() throws Exception {
DocumentMapper mapper = createDocumentMapper(
syntheticSourceFieldMapping(
b -> b.field("type", CONTENT_TYPE)
.array("metrics", "min", "max")
.field("default_metric", "min")
.field("ignore_malformed", "true")
)
);

var randomString = randomAlphaOfLength(10);
CheckedConsumer<XContentBuilder, IOException> arrayValue = b -> {
b.startArray("field");
{
b.startObject().field("max", 100).field("min", 10).endObject();
b.startObject().field("max", 200).field("min", 20).endObject();
b.value(randomString);
}
b.endArray();
};

var expected = JsonXContent.contentBuilder().startObject();
// First value comes from synthetic field loader and so is formatted in a specific format (e.g. min always come first).
// Other values are stored as is as part of ignore_malformed logic for synthetic source.
{
expected.startArray("field");
expected.startObject().field("min", 10.0).field("max", 100.0).endObject();
expected.startObject().field("max", 200).field("min", 20).endObject();
expected.value(randomString);
expected.endArray();
}
expected.endObject();

var syntheticSource = syntheticSource(mapper, arrayValue);
assertEquals(Strings.toString(expected), syntheticSource);
}

protected final class AggregateDoubleMetricSyntheticSourceSupport implements SyntheticSourceSupport {
private final boolean malformedExample;
private final EnumSet<Metric> storedMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ aggregate_metric_double with ignore_malformed:
index:
index: test
id: "1"
refresh: true
body:
metric:
min: 18.2
Expand All @@ -88,11 +87,22 @@ aggregate_metric_double with ignore_malformed:
value_count: 50

- do:
search:
index:
index: test
id: "2"
body:
metric: ["hey", {"value_count": 1, "min": 18.2,"max": 100}, [123, 456]]

- do:
indices.refresh: {}

- do:
get:
index: test
id: "1"

- match:
hits.hits.0._source:
_source:
metric:
min: 18.2
max: 100
Expand All @@ -102,3 +112,12 @@ aggregate_metric_double with ignore_malformed:
field: "field"
value_count: 50

- do:
get:
index: test
id: "2"

- match:
_source:
metric: [{"min": 18.2,"max": 100.0, "value_count": 1}, "hey", 123, 456]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment about missing the [123, 456] pair.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #109882.