Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ignore_malformed handling for synthetic souce in aggregate_metric_double #109888

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -748,24 +748,4 @@ public static XContentParser mapToXContentParser(XContentParserConfiguration con
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
}
}

/**
* Drains all data available via this parser into a provided builder.
* Provided parser is closed as a result.
* @param parser
* @param destination
*/
public static void drainAndClose(XContentParser parser, XContentBuilder destination) throws IOException {
if (parser.isClosed()) {
throw new IllegalStateException("Can't drain a parser that is closed");
}

XContentParser.Token token;
do {
destination.copyCurrentStructure(parser);
token = parser.nextToken();
} while (token != null);

parser.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.index.mapper;

import org.apache.lucene.index.LeafReader;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;

/**
* A {@link SourceLoader.SyntheticFieldLoader} that uses a set of sub-loaders
* to produce synthetic source for the field.
* Typical use case is to gather field values from doc_values and append malformed values
* stored in a different field in case of ignore_malformed being enabled.
*/
public class CompositeSyntheticFieldLoader implements SourceLoader.SyntheticFieldLoader {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all from #109882.

private final String fieldName;
private final String fullFieldName;
private final SyntheticFieldLoaderLayer[] parts;
private boolean hasValue;

public CompositeSyntheticFieldLoader(String fieldName, String fullFieldName, SyntheticFieldLoaderLayer... parts) {
this.fieldName = fieldName;
this.fullFieldName = fullFieldName;
this.parts = parts;
this.hasValue = false;
}

@Override
public Stream<Map.Entry<String, StoredFieldLoader>> storedFieldLoaders() {
return Arrays.stream(parts).flatMap(SyntheticFieldLoaderLayer::storedFieldLoaders).map(e -> Map.entry(e.getKey(), values -> {
hasValue = true;
e.getValue().load(values);
}));
}

@Override
public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf) throws IOException {
var loaders = new ArrayList<DocValuesLoader>(parts.length);
for (var part : parts) {
var partLoader = part.docValuesLoader(leafReader, docIdsInLeaf);
if (partLoader != null) {
loaders.add(partLoader);
}
}

if (loaders.isEmpty()) {
return null;
}

return docId -> {
boolean hasDocs = false;
for (var loader : loaders) {
hasDocs |= loader.advanceToDoc(docId);
}

this.hasValue |= hasDocs;
return hasDocs;
};
}

@Override
public boolean hasValue() {
return hasValue;
}

@Override
public void write(XContentBuilder b) throws IOException {
var totalCount = Arrays.stream(parts).mapToLong(SyntheticFieldLoaderLayer::valueCount).sum();

if (totalCount == 0) {
return;
}

if (totalCount == 1) {
b.field(fieldName);
for (var part : parts) {
part.write(b);
}
return;
}

b.startArray(fieldName);
for (var part : parts) {
part.write(b);
}
b.endArray();
}

@Override
public String fieldName() {
return this.fullFieldName;
}

/**
* Represents one layer of loading synthetic source values for a field
* as a part of {@link CompositeSyntheticFieldLoader}.
* <br>
* Note that the contract of {@link SourceLoader.SyntheticFieldLoader#write(XContentBuilder)}
* is slightly different here since it only needs to write field values without encompassing object or array.
*/
public interface SyntheticFieldLoaderLayer extends SourceLoader.SyntheticFieldLoader {
/**
* Number of values that this loader will write.
* @return
*/
long valueCount();
}

/**
* Layer that loads malformed values stored in a dedicated field with a conventional name.
* @see IgnoreMalformedStoredValues
*/
public static class MalformedValuesLayer implements SyntheticFieldLoaderLayer {
private final String fieldName;
private List<Object> values;

public MalformedValuesLayer(String fieldName) {
this.fieldName = fieldName + "._ignore_malformed";
this.values = emptyList();
}

@Override
public long valueCount() {
return values.size();
}

@Override
public Stream<Map.Entry<String, StoredFieldLoader>> storedFieldLoaders() {
return Stream.of(Map.entry(fieldName, values -> this.values = values));
}

@Override
public DocValuesLoader docValuesLoader(LeafReader leafReader, int[] docIdsInLeaf) throws IOException {
return null;
}

@Override
public boolean hasValue() {
return values.isEmpty() == false;
}

@Override
public void write(XContentBuilder b) throws IOException {
for (Object v : values) {
if (v instanceof BytesRef r) {
XContentDataHelper.decodeAndWrite(b, r);
} else {
b.value(v);
}
}
values = emptyList();
}

@Override
public String fieldName() {
return fieldName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.common.xcontent.support;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -421,25 +420,4 @@ public void testParseToType() throws IOException {

assertThat(names, equalTo(Set.of("a", "c")));
}

public void testDrainAndClose() throws IOException {
String json = """
{ "a": "b", "c": "d", "e": {"f": "g"}, "h": ["i", "j", {"k": "l"}]}""";
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json);
var content = XContentBuilder.builder(XContentType.JSON.xContent());
XContentHelper.drainAndClose(parser, content);

assertEquals(json.replace(" ", ""), Strings.toString(content));
assertTrue(parser.isClosed());
}

public void testDrainAndCloseAlreadyClosed() throws IOException {
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, "{}");
parser.close();

assertThrows(
IllegalStateException.class,
() -> XContentHelper.drainAndClose(parser, XContentBuilder.builder(XContentType.JSON.xContent()))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateMathParser;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.fielddata.FieldDataContext;
Expand All @@ -28,9 +27,10 @@
import org.elasticsearch.index.fielddata.ScriptDocValues.DoublesSupplier;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.CompositeSyntheticFieldLoader;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperBuilderContext;
Expand All @@ -43,7 +43,6 @@
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType;
import org.elasticsearch.index.mapper.ValueFetcher;
import org.elasticsearch.index.mapper.XContentDataHelper;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.script.ScriptCompiler;
Expand All @@ -53,6 +52,7 @@
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.sort.BucketedSort;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.CopyingXContentParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentSubParser;
Expand Down Expand Up @@ -592,9 +592,7 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
EnumMap<Metric, Number> metricsParsed = new EnumMap<>(Metric.class);
// Preserves the content of the field in order to be able to construct synthetic source
// if field value is malformed.
XContentBuilder malformedContentForSyntheticSource = context.mappingLookup().isSourceSynthetic() && ignoreMalformed
? XContentBuilder.builder(context.parser().contentType().xContent())
: null;
XContentBuilder malformedDataForSyntheticSource = null;

try {
token = context.parser().currentToken();
Expand All @@ -603,11 +601,14 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
return;
}
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser());
subParser = new XContentSubParser(context.parser());
token = subParser.nextToken();
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.startObject();
if (context.mappingLookup().isSourceSynthetic() && ignoreMalformed) {
var copyingParser = new CopyingXContentParser(context.parser());
malformedDataForSyntheticSource = copyingParser.getBuilder();
subParser = new XContentSubParser(copyingParser);
} else {
subParser = new XContentSubParser(context.parser());
}
token = subParser.nextToken();
while (token != XContentParser.Token.END_OBJECT) {
// should be an object sub-field with name a metric name
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser);
Expand All @@ -621,9 +622,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
}

token = subParser.nextToken();
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.field(fieldName);
}
// Make sure that the value is a number. Probably this will change when
// new aggregate metric types are added (histogram, cardinality etc)
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser);
Expand All @@ -632,9 +630,6 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
try {
Number metricValue = delegateFieldMapper.value(context.parser());
metricsParsed.put(metric, metricValue);
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.value(metricValue);
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("failed to parse [" + metric.name() + "] sub field: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -677,24 +672,20 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio
}
} catch (Exception e) {
if (ignoreMalformed) {
if (malformedContentForSyntheticSource != null) {
if (subParser != null) {
// Remaining data in parser needs to be stored as is in order to provide it in synthetic source.
XContentHelper.drainAndClose(subParser, malformedContentForSyntheticSource);
} else {
// We don't use DrainingXContentParser since we don't want to go beyond current field
malformedContentForSyntheticSource.copyCurrentStructure(context.parser());
}
;
var nameValue = IgnoredSourceFieldMapper.NameValue.fromContext(
context,
name(),
XContentDataHelper.encodeXContentBuilder(malformedContentForSyntheticSource)
);
context.addIgnoredField(nameValue);
} else if (subParser != null) {
if (subParser != null) {
// close the subParser, so we advance to the end of the object
subParser.close();
} else {
if (context.mappingLookup().isSourceSynthetic()) {
// There is a malformed value, but it is not an object (since subParser is null).
// So we just need to copy this single value.
malformedDataForSyntheticSource = XContentBuilder.builder(context.parser().contentType().xContent())
.copyCurrentStructure(context.parser());
}
}

if (malformedDataForSyntheticSource != null) {
context.doc().add(IgnoreMalformedStoredValues.storedField(name(), malformedDataForSyntheticSource));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blocks gets repeated, consider moving it to a helper in IgnoreMalformedStoredValues

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it if you don't mind i don't think it's too bad of a repetition.


context.addIgnoredField(name());
Expand Down Expand Up @@ -724,11 +715,15 @@ protected SyntheticSourceMode syntheticSourceMode() {

@Override
public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
// Note that malformed values are handled via `IgnoredSourceFieldMapper` infrastructure
return new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics);
return new CompositeSyntheticFieldLoader(
simpleName(),
name(),
new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics),
new CompositeSyntheticFieldLoader.MalformedValuesLayer(name())
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cute :)

}

public static class AggregateMetricSyntheticFieldLoader implements SourceLoader.SyntheticFieldLoader {
public static class AggregateMetricSyntheticFieldLoader implements CompositeSyntheticFieldLoader.SyntheticFieldLoaderLayer {
private final String name;
private final String simpleName;
private final EnumSet<Metric> metrics;
Expand All @@ -746,6 +741,11 @@ public String fieldName() {
return name;
}

@Override
public long valueCount() {
return hasValue() ? 1 : 0;
}

@Override
public Stream<Map.Entry<String, StoredFieldLoader>> storedFieldLoaders() {
return Stream.of();
Expand Down Expand Up @@ -779,7 +779,7 @@ public void write(XContentBuilder b) throws IOException {
if (metricHasValue.isEmpty()) {
return;
}
b.startObject(simpleName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question, why do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #109882.

b.startObject();
for (Map.Entry<Metric, SortedNumericDocValues> entry : metricDocValues.entrySet()) {
if (metricHasValue.contains(entry.getKey())) {
String metricName = entry.getKey().name();
Expand Down
Loading