Skip to content

Commit

Permalink
[ML] Add lazy parsing for DatafeedConfig:Aggs,Query (#36117)
Browse files Browse the repository at this point in the history
* Lazily parsing aggs and query in DatafeedConfigs

* Adding parser tests

* Fixing exception types && unneccessary checked ex

* Adding semi aggregation parser

* Adding tests, fixing up semi-parser

* Reverting semi-parsing

* Moving agg validations

* Making bad configs throw badRequestException
  • Loading branch information
benwtrent authored Dec 4, 2018
1 parent 6e1ff31 commit 166d9a9
Show file tree
Hide file tree
Showing 20 changed files with 295 additions and 79 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,11 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map<String, String> h
builder.setTypes(types);
}
if (query != null) {
builder.setQuery(query);
builder.setParsedQuery(query);
}
if (aggregations != null) {
builder.setAggregations(aggregations);
DatafeedConfig.validateAggregations(aggregations);
builder.setParsedAggregations(aggregations);
}
if (scriptFields != null) {
builder.setScriptFields(scriptFields);
Expand Down Expand Up @@ -371,9 +372,9 @@ boolean isNoop(DatafeedConfig datafeed) {
&& (queryDelay == null || Objects.equals(queryDelay, datafeed.getQueryDelay()))
&& (indices == null || Objects.equals(indices, datafeed.getIndices()))
&& (types == null || Objects.equals(types, datafeed.getTypes()))
&& (query == null || Objects.equals(query, datafeed.getQuery()))
&& (query == null || Objects.equals(query, datafeed.getParsedQuery()))
&& (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations()))
&& (aggregations == null || Objects.equals(aggregations, datafeed.getParsedAggregations()))
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
&& (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class Messages {
"delayed_data_check_config: check_window [{0}] must be greater than the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS =
"delayed_data_check_config: check_window [{0}] must be less than 10,000x the bucket_span [{1}]";
public static final String DATAFEED_CONFIG_QUERY_BAD_FORMAT = "Datafeed [{0}] query is not parsable: {1}";
public static final String DATAFEED_CONFIG_AGG_BAD_FORMAT = "Datafeed [{0}] aggregations are not parsable: {1}";

public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency";
public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public static XContentObjectTransformer<QueryBuilder> queryBuilderTransformer()
}

public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
if (stringObjectMap == null) {
return null;
}
LoggingDeprecationAccumulationHandler deprecationLogger = new LoggingDeprecationAccumulationHandler();
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().map(stringObjectMap);
XContentParser parser = XContentType.JSON
Expand All @@ -74,6 +77,9 @@ public T fromMap(Map<String, Object> stringObjectMap) throws IOException {
}

public Map<String, Object> toMap(T object) throws IOException {
if (object == null) {
return null;
}
try(XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()) {
XContentBuilder content = object.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
return XContentHelper.convertToMap(BytesReference.bytes(content), true, XContentType.JSON).v2();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
builder.setParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}
boolean addScriptFields = randomBoolean();
if (addScriptFields) {
Expand All @@ -91,7 +91,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
aggs.addAggregator(AggregationBuilders.dateHistogram("buckets")
.interval(aggHistogramInterval).subAggregation(maxTime).field("time"));
builder.setAggregations(aggs);
builder.setParsedAggregations(aggs);
}
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
Expand Down Expand Up @@ -155,6 +155,43 @@ protected DatafeedConfig doParseInstance(XContentParser parser) {
" \"scroll_size\": 1234\n" +
"}";

private static final String ANACHRONISTIC_QUERY_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
//query:match:type stopped being supported in 6.x
" \"query\": {\"match\" : {\"query\":\"fieldName\", \"type\": \"phrase\"}},\n" +
" \"scroll_size\": 1234\n" +
"}";

private static final String ANACHRONISTIC_AGG_DATAFEED = "{\n" +
" \"datafeed_id\": \"farequote-datafeed\",\n" +
" \"job_id\": \"farequote\",\n" +
" \"frequency\": \"1h\",\n" +
" \"indices\": [\"farequote1\", \"farequote2\"],\n" +
" \"aggregations\": {\n" +
" \"buckets\": {\n" +
" \"date_histogram\": {\n" +
" \"field\": \"time\",\n" +
" \"interval\": \"360s\",\n" +
" \"time_zone\": \"UTC\"\n" +
" },\n" +
" \"aggregations\": {\n" +
" \"time\": {\n" +
" \"max\": {\"field\": \"time\"}\n" +
" },\n" +
" \"airline\": {\n" +
" \"terms\": {\n" +
" \"field\": \"airline\",\n" +
" \"size\": 0\n" + //size: 0 stopped being supported in 6.x
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
"}";

public void testFutureConfigParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
Expand All @@ -163,6 +200,44 @@ public void testFutureConfigParse() throws IOException {
assertEquals("[6:5] [datafeed_config] unknown field [tomorrows_technology_today], parser not found", e.getMessage());
}

public void testPastQueryConfigParse() throws IOException {
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) {

DatafeedConfig config = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> config.getParsedQuery());
assertEquals("[match] query doesn't support multiple fields, found [query] and [type]", e.getMessage());
}

try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_QUERY_DATAFEED)) {

XContentParseException e = expectThrows(XContentParseException.class,
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
assertEquals("[6:25] [datafeed_config] failed to parse field [query]", e.getMessage());
}
}

public void testPastAggConfigParse() throws IOException {
try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) {

DatafeedConfig.Builder configBuilder = DatafeedConfig.LENIENT_PARSER.apply(parser, null);
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> configBuilder.build());
assertEquals(
"Datafeed [farequote-datafeed] aggregations are not parsable: [size] must be greater than 0. Found [0] in [airline]",
e.getMessage());
}

try(XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, ANACHRONISTIC_AGG_DATAFEED)) {

XContentParseException e = expectThrows(XContentParseException.class,
() -> DatafeedConfig.STRICT_PARSER.apply(parser, null).build());
assertEquals("[8:25] [datafeed_config] failed to parse field [aggregations]", e.getMessage());
}
}

public void testFutureMetadataParse() throws IOException {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, FUTURE_DATAFEED);
Expand Down Expand Up @@ -274,7 +349,7 @@ public void testBuild_GivenScriptFieldsAndAggregations() {
datafeed.setTypes(Collections.singletonList("my_type"));
datafeed.setScriptFields(Collections.singletonList(new SearchSourceBuilder.ScriptField(randomAlphaOfLength(10),
mockScript(randomAlphaOfLength(10)), randomBoolean())));
datafeed.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
datafeed.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));

ElasticsearchException e = expectThrows(ElasticsearchException.class, datafeed::build);

Expand All @@ -295,7 +370,7 @@ public void testHasAggregations_NonEmpty() {
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(300000).subAggregation(maxTime).field("time")));
DatafeedConfig datafeedConfig = builder.build();

Expand All @@ -306,7 +381,7 @@ public void testBuild_GivenEmptyAggregations() {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder());
builder.setParsedAggregations(new AggregatorFactories.Builder());

ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);

Expand All @@ -318,13 +393,13 @@ public void testBuild_GivenHistogramWithDefaultInterval() {
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(
builder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time").subAggregation(maxTime).field("time"))
);

ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);

assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
assertThat(e.getMessage(), containsString("[interval] must be >0 for histogram aggregation [time]"));
}

public void testBuild_GivenDateHistogramWithInvalidTimeZone() {
Expand All @@ -341,7 +416,7 @@ public void testBuild_GivenDateHistogramWithDefaultInterval() {
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> createDatafeedWithDateHistogram((String) null));

assertThat(e.getMessage(), equalTo("Aggregation interval must be greater than 0"));
assertThat(e.getMessage(), containsString("Aggregation interval must be greater than 0"));
}

public void testBuild_GivenValidDateHistogram() {
Expand Down Expand Up @@ -402,9 +477,8 @@ public void testValidateAggregations_GivenMulitpleHistogramAggs() {
TermsAggregationBuilder toplevelTerms = AggregationBuilders.terms("top_level");
toplevelTerms.subAggregation(dateHistogram);

DatafeedConfig.Builder builder = new DatafeedConfig.Builder("foo", "bar");
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms));
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::validateAggregations);
ElasticsearchException e = expectThrows(ElasticsearchException.class,
() -> DatafeedConfig.validateAggregations(new AggregatorFactories.Builder().addAggregator(toplevelTerms)));

assertEquals("Aggregations can only have 1 date_histogram or histogram aggregation", e.getMessage());
}
Expand Down Expand Up @@ -520,7 +594,9 @@ private static DatafeedConfig createDatafeedWithDateHistogram(DateHistogramAggre
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
builder.setIndices(Collections.singletonList("myIndex"));
builder.setTypes(Collections.singletonList("myType"));
builder.setAggregations(new AggregatorFactories.Builder().addAggregator(dateHistogram));
AggregatorFactories.Builder aggs = new AggregatorFactories.Builder().addAggregator(dateHistogram);
DatafeedConfig.validateAggregations(aggs);
builder.setParsedAggregations(aggs);
return builder.build();
}

Expand Down Expand Up @@ -556,11 +632,11 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept
break;
case 6:
BoolQueryBuilder query = new BoolQueryBuilder();
if (instance.getQuery() != null) {
query.must(instance.getQuery());
if (instance.getParsedQuery() != null) {
query.must(instance.getParsedQuery());
}
query.filter(new TermQueryBuilder(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
builder.setQuery(query);
builder.setParsedQuery(query);
break;
case 7:
if (instance.hasAggregations()) {
Expand All @@ -571,7 +647,7 @@ protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOExcept
aggBuilder
.addAggregator(new DateHistogramAggregationBuilder(timeField).field(timeField).interval(between(10000, 3600000))
.subAggregation(new MaxAggregationBuilder(timeField).field(timeField)));
builder.setAggregations(aggBuilder);
builder.setParsedAggregations(aggBuilder);
if (instance.getScriptFields().isEmpty() == false) {
builder.setScriptFields(Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testApply_givenFullUpdateNoAggregations() {
assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_2")));
assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
assertThat(updatedDatafeed.getQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.getParsedQuery(), equalTo(QueryBuilders.termQuery("a", "b")));
assertThat(updatedDatafeed.hasAggregations(), is(false));
assertThat(updatedDatafeed.getScriptFields(),
equalTo(Collections.singletonList(new SearchSourceBuilder.ScriptField("a", mockScript("b"), false))));
Expand All @@ -192,7 +192,7 @@ public void testApply_givenAggregations() {

assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_1")));
assertThat(updatedDatafeed.getTypes(), equalTo(Collections.singletonList("t_1")));
assertThat(updatedDatafeed.getAggregations(),
assertThat(updatedDatafeed.getParsedAggregations(),
equalTo(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("a").interval(300000).field("time").subAggregation(maxTime))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception
DatafeedConfig.Builder datafeedConfigBuilder = createDatafeedBuilder(job.getId() + "-datafeed",
job.getId(),
Collections.singletonList(index));
datafeedConfigBuilder.setAggregations(new AggregatorFactories.Builder().addAggregator(
datafeedConfigBuilder.setParsedAggregations(new AggregatorFactories.Builder().addAggregator(
AggregationBuilders.histogram("time")
.subAggregation(maxTime)
.subAggregation(avgAggregationBuilder)
.field("time")
.interval(TimeValue.timeValueMinutes(5).millis())));
datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2));
datafeedConfigBuilder.setParsedQuery(new RangeQueryBuilder("value").gte(numDocs/2));
datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5));
datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.security.SecurityContext;
Expand Down Expand Up @@ -154,6 +155,7 @@ private void handlePrivsResponse(String username, PutDatafeedAction.Request requ
private void putDatafeed(PutDatafeedAction.Request request, Map<String, String> headers,
ActionListener<PutDatafeedAction.Response> listener) {

DatafeedConfig.validateAggregations(request.getDatafeed().getParsedAggregations());
clusterService.submitStateUpdateTask(
"put-datafeed-" + request.getDatafeed().getId(),
new AckedClusterStateUpdateTask<PutDatafeedAction.Response>(request, listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCu
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
DatafeedJobValidator.validate(datafeed, job);
DatafeedConfig.validateAggregations(datafeed.getParsedAggregations());
JobState jobState = MlTasks.getJobState(datafeed.getJobId(), tasks);
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
throw ExceptionsHelper.conflictStatusException("cannot start datafeed [" + datafeedId + "] because job [" + job.getId() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static DelayedDataDetector buildDetector(Job job, DatafeedConfig datafeed
window,
job.getId(),
job.getDataDescription().getTimeField(),
datafeedConfig.getQuery(),
datafeedConfig.getParsedQuery(),
datafeedConfig.getIndices().toArray(new String[0]),
client);
} else {
Expand Down
Loading

0 comments on commit 166d9a9

Please sign in to comment.