Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Add lazy parsing for DatafeedConfig:Aggs,Query #36117

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CachedSupplier;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
Expand All @@ -31,6 +33,7 @@
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;

import java.io.IOException;
Expand All @@ -43,6 +46,7 @@
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

/**
* Datafeed configuration options. Describes where to proactively pull input
Expand All @@ -60,6 +64,45 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private static final int TWO_MINS_SECONDS = 2 * SECONDS_IN_MINUTE;
private static final int TWENTY_MINS_SECONDS = 20 * SECONDS_IN_MINUTE;
private static final int HALF_DAY_SECONDS = 12 * 60 * SECONDS_IN_MINUTE;
static final XContentObjectTransformer<QueryBuilder> QUERY_TRANSFORMER = XContentObjectTransformer.queryBuilderTransformer();
private static final BiFunction<Map<String, Object>, String, QueryBuilder> lazyQueryParser = (objectMap, id) -> {
try {
return QUERY_TRANSFORMER.fromMap(objectMap);
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
throw ExceptionsHelper.unprocessableEntityException(
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.unprocessableEntityException(
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, exception, id),
exception);
}
}
};

static final XContentObjectTransformer<AggregatorFactories.Builder> AGG_TRANSFORMER = XContentObjectTransformer.aggregatorTransformer();
private static final BiFunction<Map<String, Object>, String, AggregatorFactories.Builder> lazyAggParser = (objectMap, id) -> {
try {
return AGG_TRANSFORMER.fromMap(objectMap);
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
throw ExceptionsHelper.unprocessableEntityException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.unprocessableEntityException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, exception.getMessage(), id),
exception);
}
}
};

// Used for QueryPage
public static final ParseField RESULTS_FIELD = new ParseField("datafeeds");
Expand Down Expand Up @@ -102,9 +145,15 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
builder.setQueryDelay(TimeValue.parseTimeValue(val, QUERY_DELAY.getPreferredName())), QUERY_DELAY);
parser.declareString((builder, val) ->
builder.setFrequency(TimeValue.parseTimeValue(val, FREQUENCY.getPreferredName())), FREQUENCY);
parser.declareObject(Builder::setQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
parser.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
parser.declareObject(Builder::setAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
if (ignoreUnknownFields) {
parser.declareObject(Builder::setQuery, (p, c) -> p.map(), QUERY);
parser.declareObject(Builder::setAggregations, (p, c) -> p.map(), AGGREGATIONS);
parser.declareObject(Builder::setAggregations, (p, c) -> p.map(), AGGS);
} else {
parser.declareObject(Builder::setParsedQuery, (p, c) -> AbstractQueryBuilder.parseInnerQueryBuilder(p), QUERY);
parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGREGATIONS);
parser.declareObject(Builder::setParsedAggregations, (p, c) -> AggregatorFactories.parseAggregators(p), AGGS);
}
parser.declareObject(Builder::setScriptFields, (p, c) -> {
List<SearchSourceBuilder.ScriptField> parsedScriptFields = new ArrayList<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -146,16 +195,18 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie

private final List<String> indices;
private final List<String> types;
private final QueryBuilder query;
private final AggregatorFactories.Builder aggregations;
private final Map<String, Object> query;
private final Map<String, Object> aggregations;
private final List<SearchSourceBuilder.ScriptField> scriptFields;
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final Map<String, String> headers;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final CachedSupplier<QueryBuilder> querySupplier;
private final CachedSupplier<AggregatorFactories.Builder> aggSupplier;

private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, List<String> types,
QueryBuilder query, AggregatorFactories.Builder aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Map<String, Object> query, Map<String, Object> aggregations, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig, Map<String, String> headers,
DelayedDataCheckConfig delayedDataCheckConfig) {
this.id = id;
Expand All @@ -171,6 +222,8 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
this.chunkingConfig = chunkingConfig;
this.headers = Collections.unmodifiableMap(headers);
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id));
}

public DatafeedConfig(StreamInput in) throws IOException {
Expand All @@ -188,8 +241,17 @@ public DatafeedConfig(StreamInput in) throws IOException {
} else {
this.types = null;
}
this.query = in.readNamedWriteable(QueryBuilder.class);
this.aggregations = in.readOptionalWriteable(AggregatorFactories.Builder::new);
if (in.getVersion().before(Version.CURRENT)) {
this.query = QUERY_TRANSFORMER.toMap(in.readNamedWriteable(QueryBuilder.class));
this.aggregations = AGG_TRANSFORMER.toMap(in.readOptionalWriteable(AggregatorFactories.Builder::new));
} else {
this.query = in.readMap();
if (in.readBoolean()) {
this.aggregations = in.readMap();
} else {
this.aggregations = null;
}
}
if (in.readBoolean()) {
this.scriptFields = Collections.unmodifiableList(in.readList(SearchSourceBuilder.ScriptField::new));
} else {
Expand All @@ -207,6 +269,8 @@ public DatafeedConfig(StreamInput in) throws IOException {
} else {
delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
}
this.querySupplier = new CachedSupplier<>(() -> lazyQueryParser.apply(query, id));
this.aggSupplier = new CachedSupplier<>(() -> lazyAggParser.apply(aggregations, id));
}

public String getId() {
Expand Down Expand Up @@ -237,26 +301,34 @@ public Integer getScrollSize() {
return scrollSize;
}

public QueryBuilder getQuery() {
public QueryBuilder getParsedQuery() {
return querySupplier.get();
}

public Map<String, Object> getQuery() {
return query;
}

public AggregatorFactories.Builder getAggregations() {
public AggregatorFactories.Builder getParsedAggregations() {
return aggSupplier.get();
}

public Map<String, Object> getAggregations() {
return aggregations;
}

/**
* Returns the histogram's interval as epoch millis.
*/
public long getHistogramIntervalMillis() {
return ExtractorUtils.getHistogramIntervalMillis(aggregations);
return ExtractorUtils.getHistogramIntervalMillis(getParsedAggregations());
}

/**
* @return {@code true} when there are non-empty aggregations, {@code false} otherwise
*/
public boolean hasAggregations() {
return aggregations != null && aggregations.count() > 0;
return aggregations != null && aggregations.size() > 0;
}

public List<SearchSourceBuilder.ScriptField> getScriptFields() {
Expand Down Expand Up @@ -293,8 +365,16 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
out.writeNamedWriteable(query);
out.writeOptionalWriteable(aggregations);
if (out.getVersion().before(Version.CURRENT)) {
out.writeNamedWriteable(getParsedQuery());
out.writeOptionalWriteable(getParsedAggregations());
} else {
out.writeMap(query);
out.writeBoolean(aggregations != null);
if (aggregations != null) {
out.writeMap(aggregations);
}
}
if (scriptFields != null) {
out.writeBoolean(true);
out.writeList(scriptFields);
Expand Down Expand Up @@ -454,15 +534,18 @@ public static class Builder {
private TimeValue frequency;
private List<String> indices = Collections.emptyList();
private List<String> types = Collections.emptyList();
private QueryBuilder query = QueryBuilders.matchAllQuery();
private AggregatorFactories.Builder aggregations;
private Map<String, Object> query;
private Map<String, Object> aggregations;
private List<SearchSourceBuilder.ScriptField> scriptFields;
private Integer scrollSize = DEFAULT_SCROLL_SIZE;
private ChunkingConfig chunkingConfig;
private Map<String, String> headers = Collections.emptyMap();
private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();

public Builder() {
try {
this.query = QUERY_TRANSFORMER.toMap(QueryBuilders.matchAllQuery());
} catch (IOException ex) { /*Should never happen*/ }
}

public Builder(String id, String jobId) {
Expand Down Expand Up @@ -517,11 +600,47 @@ public void setFrequency(TimeValue frequency) {
this.frequency = frequency;
}

public void setQuery(QueryBuilder query) {
public void setParsedQuery(QueryBuilder query) {
try {
setQuery(QUERY_TRANSFORMER.toMap(ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName())));
} catch (IOException | XContentParseException exception) {
if (exception.getCause() instanceof IllegalArgumentException) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
throw ExceptionsHelper.unprocessableEntityException(
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.unprocessableEntityException(
Messages.getMessage(Messages.DATAFEED_CONFIG_QUERY_BAD_FORMAT, id, exception.getMessage()), exception);
}
}
}

void setQuery(Map<String, Object> query) {
this.query = ExceptionsHelper.requireNonNull(query, QUERY.getPreferredName());
}

public void setAggregations(AggregatorFactories.Builder aggregations) {
public void setParsedAggregations(AggregatorFactories.Builder aggregations) {
try {
setAggregations(AGG_TRANSFORMER.toMap(aggregations));
} catch (IOException | XContentParseException exception) {
// Certain thrown exceptions wrap up the real Illegal argument making it hard to determine cause for the user
if (exception.getCause() instanceof IllegalArgumentException) {
throw ExceptionsHelper.unprocessableEntityException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT,
id,
exception.getCause().getMessage()),
exception.getCause());
} else {
throw ExceptionsHelper.unprocessableEntityException(
Messages.getMessage(Messages.DATAFEED_CONFIG_AGG_BAD_FORMAT, id, exception.getMessage()), exception);
}
}
}

void setAggregations(Map<String, Object> aggregations) {
this.aggregations = aggregations;
}

Expand Down Expand Up @@ -564,14 +683,16 @@ public DatafeedConfig build() {
throw invalidOptionValue(TYPES.getPreferredName(), types);
}

validateAggregations();
setDefaultChunkingConfig();
AggregatorFactories.Builder parsedAggs = lazyAggParser.apply(aggregations, id);
benwtrent marked this conversation as resolved.
Show resolved Hide resolved
validateAggregations(parsedAggs);
setDefaultChunkingConfig(parsedAggs);

setDefaultQueryDelay();
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize,
chunkingConfig, headers, delayedDataCheckConfig);
}

void validateAggregations() {
void validateAggregations(AggregatorFactories.Builder aggregations) {
if (aggregations == null) {
return;
}
Expand Down Expand Up @@ -625,7 +746,7 @@ private static void checkHistogramIntervalIsPositive(AggregationBuilder histogra
}
}

private void setDefaultChunkingConfig() {
private void setDefaultChunkingConfig(AggregatorFactories.Builder aggregations) {
if (chunkingConfig == null) {
if (aggregations == null) {
chunkingConfig = ChunkingConfig.newAuto();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map<String, String> h
builder.setTypes(types);
}
if (query != null) {
builder.setQuery(query);
builder.setParsedQuery(query);
}
if (aggregations != null) {
builder.setAggregations(aggregations);
builder.setParsedAggregations(aggregations);
}
if (scriptFields != null) {
builder.setScriptFields(scriptFields);
Expand Down Expand Up @@ -371,9 +371,9 @@ boolean isNoop(DatafeedConfig datafeed) {
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
&& (types == null || Objects.equals(types, datafeed.getTypes()))
&& (query == null || Objects.equals(query, datafeed.getQuery()))
&& (query == null || Objects.equals(query, datafeed.getParsedQuery()))
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getParsedAggregations()))
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
&& (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class Messages {
"delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS =
"delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable: {1}";
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable: {1}";

public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ public static ElasticsearchStatusException badRequestException(String msg, Objec
return new ElasticsearchStatusException(msg, RestStatus.BAD_REQUEST, args);
}

public static ElasticsearchStatusException unprocessableEntityException(String msg, Throwable cause, Object... args) {
return new ElasticsearchStatusException(msg, RestStatus.UNPROCESSABLE_ENTITY, cause, args);
}

public static ElasticsearchStatusException unprocessableEntityException(String msg, Object... args) {
return new ElasticsearchStatusException(msg, RestStatus.UNPROCESSABLE_ENTITY, args);
}

/**
* Creates an error message that explains there are shard failures, displays info
* for the first failure (shard/reason) and kindly asks to see more info in the logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public static XContentObjectTransformer<QueryBuilder> queryBuilderTransformer()
}

public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
if (stringObjectMap == null) {
return null;
}
LoggingDeprecationAccumulationHandler deprecationLogger = new LoggingDeprecationAccumulationHandler();
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(stringObjectMap);
XContentParser parser = XContentType.JSON
Expand All @@ -74,6 +77,9 @@ public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
}

public Map<String, Object> toMap(T object) throws IOException {
if (object == null) {
return null;
}
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = object.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
return XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();
Expand Down
Loading