Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Datafeed config CRUD operations #32854

Merged
merged 5 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Expand datafeed Ids
  • Loading branch information
davidkyle committed Aug 15, 2018
commit c989025587746dbe71bde80ac3f3561715ecf564
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
datafeed.doXContentBody(builder, params);
builder.endObject();
datafeed.toXContent(builder, params);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final String DOC_COUNT = "doc_count";

public static final ParseField ID = new ParseField("datafeed_id");
public static final ParseField CONFIG_TYPE = new ParseField("config_type");
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField INDEXES = new ParseField("indexes");
Expand All @@ -93,6 +94,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
ObjectParser<Builder, Void> parser = new ObjectParser<>("datafeed_config", ignoreUnknownFields, Builder::new);

parser.declareString(Builder::setId, ID);
parser.declareString((c, s) -> {}, CONFIG_TYPE);
parser.declareString(Builder::setJobId, Job.ID);
parser.declareStringArray(Builder::setIndices, INDEXES);
parser.declareStringArray(Builder::setIndices, INDICES);
Expand Down Expand Up @@ -220,6 +222,10 @@ public String getJobId() {
return jobId;
}

public String getConfigType() {
return TYPE;
}

public TimeValue getQueryDelay() {
return queryDelay;
}
Expand Down Expand Up @@ -314,14 +320,14 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
doXContentBody(builder, params);
builder.endObject();
return builder;
}

public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(ID.getPreferredName(), id);
builder.field(Job.ID.getPreferredName(), jobId);
// Config type field is added for the migration to index documents
// and isn't needed in cluster state configs. Not writing the field
// protects against BWC issues.
if (params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally this ToXContentParams.FOR_CLUSTER_STATE parameter was used to avoid showing the user internal implementation details. Based on the original intent the parameter should probably have been named ToXContentParams.FOR_INTERNAL_STORAGE.

There is also another constant, MlMetaIndex.INCLUDE_TYPE_KEY, which is used to avoid showing the end user the metadata document type in REST responses. I think we should also use that for config documents. So maybe move INCLUDE_TYPE_KEY into ToXContentParams so it can be reused here.

Then the logic here will be to include the type neither for cluster state nor for REST responses.

builder.field(CONFIG_TYPE.getPreferredName(), TYPE);
}
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
Expand All @@ -346,6 +352,7 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) {
builder.field(HEADERS.getPreferredName(), headers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to store the headers in the indexed document.

}
builder.endObject();
return builder;
}

Expand Down Expand Up @@ -485,6 +492,10 @@ public void setId(String datafeedId) {
id = ExceptionsHelper.requireNonNull(datafeedId, ID.getPreferredName());
}

public String getId() {
return id;
}

public void setJobId(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
Expand All @@ -36,17 +40,22 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
Expand All @@ -63,6 +72,10 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) {
}

public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) {
return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build();
}

private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
Expand Down Expand Up @@ -109,7 +122,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
return builder.build();
return builder;
}

@Override
Expand Down Expand Up @@ -167,6 +180,33 @@ public void testFutureMetadataParse() throws IOException {
assertNotNull(DatafeedConfig.LENIENT_PARSER.apply(parser, null).build());
}

public void testToXContentForClusterState() throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to add a test to check the datafeed type isn't written when we are writing to cluster state i.e. this piece of code in DatafeedConfig::toXContent

if (params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == false) {
            builder.field(CONFIG_TYPE.getPreferredName(), TYPE);
        }

But it's hard to test so I added a check for the headers which similarly should only be persisted in certain circumstances

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not the following something similar to the following? It may work to simple directly check the serialized data string.

String stringToCheck = "\""+DatafeedConfig.CONFIG_TYPE.getPreferredName()+"\":\""+DatafeedConfig.TYPE+"\"";
assertThat(forClusterstateXContent.utf8ToString(), containsString(stringToCheck));
...
assertThat(forClusterstateXContent.utf8ToString(), not(containsString(stringToCheck)));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that works, great idea I never thought of it.

DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300);

// headers are only persisted to cluster state
Map<String, String> headers = new HashMap<>();
headers.put("header-name", "header-value");
builder.setHeaders(headers);
DatafeedConfig config = builder.build();

ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"));

BytesReference forClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, params, false);
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, forClusterstateXContent.streamInput());

DatafeedConfig parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
assertThat(parsedConfig.getHeaders(), hasEntry("header-name", "header-value"));

// headers are not written without the FOR_CLUSTER_STATE param
BytesReference nonClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, ToXContent.EMPTY_PARAMS, false);
parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, nonClusterstateXContent.streamInput());

parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
assertThat(parsedConfig.getHeaders().entrySet(), hasSize(0));
}

public void testCopyConstructor() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
DatafeedConfig datafeedConfig = createTestInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -29,15 +33,27 @@
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.ExpandedIdsMatcher;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
Expand Down Expand Up @@ -137,7 +153,7 @@ public void onFailure(Exception e) {
*
* @param datafeedId The Id of the datafeed to update
* @param update The update
* @param headers
* @param headers Datafeed headers applied with the update
* @param updatedConfigListener Updated datafeed config listener
*/
public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map<String, String> headers,
Expand Down Expand Up @@ -200,8 +216,154 @@ public void onFailure(Exception e) {
});
}

private void parseLenientlyFromSource(BytesReference source, ActionListener<DatafeedConfig.Builder> datafeedConfigListener) {
/**
* Expands an expression into the set of matching names. {@code expresssion}
* may be a wildcard, a datafeed ID or a list of those.
* If {@code expression} == 'ALL', '*' or the empty string then all
* datafeed IDs are returned.
*
* For example, given a set of names ["foo-1", "foo-2", "bar-1", bar-2"],
* expressions resolve follows:
* <ul>
* <li>"foo-1" : ["foo-1"]</li>
* <li>"bar-1" : ["bar-1"]</li>
* <li>"foo-1,foo-2" : ["foo-1", "foo-2"]</li>
* <li>"foo-*" : ["foo-1", "foo-2"]</li>
* <li>"*-1" : ["bar-1", "foo-1"]</li>
* <li>"*" : ["bar-1", "bar-2", "foo-1", "foo-2"]</li>
* <li>"_all" : ["bar-1", "bar-2", "foo-1", "foo-2"]</li>
* </ul>
*
* @param expression the expression to resolve
* @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}.
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param listener The expanded datafeed IDs listener
*/
public void expandDatafeedIds(String expression, boolean allowNoDatafeeds, ActionListener<Set<String>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
sourceBuilder.sort(DatafeedConfig.ID.getPreferredName());
String [] includes = new String[] {DatafeedConfig.ID.getPreferredName()};
sourceBuilder.fetchSource(includes, null);

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder).request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
response -> {
Set<String> datafeedIds = new HashSet<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
datafeedIds.add((String)hit.getSourceAsMap().get(DatafeedConfig.ID.getPreferredName()));
}

requiredMatches.filterMatchedIds(datafeedIds);
if (requiredMatches.hasUnmatchedIds()) {
// some required datafeeds were not found
listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString()));
return;
}

listener.onResponse(datafeedIds);
},
listener::onFailure)
, client::search);

}

/**
* The same logic as {@link #expandDatafeedIds(String, boolean, ActionListener)} but
* the full datafeed configuration is returned.
*
* See {@link #expandDatafeedIds(String, boolean, ActionListener)}
*
* @param expression the expression to resolve
* @param allowNoDatafeeds if {@code false}, an error is thrown when no name matches the {@code expression}.
* This only applies to wild card expressions, if {@code expression} is not a
* wildcard then setting this true will not suppress the exception
* @param listener The expanded datafeed config listener
*/
// NORELEASE datafeed configs should be paged or have a mechanism to return all jobs if there are many of them
public void expandDatafeedConfigs(String expression, boolean allowNoDatafeeds, ActionListener<List<DatafeedConfig.Builder>> listener) {
String [] tokens = ExpandedIdsMatcher.tokenizeExpression(expression);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildQuery(tokens));
sourceBuilder.sort(DatafeedConfig.ID.getPreferredName());

SearchRequest searchRequest = client.prepareSearch(AnomalyDetectorsIndex.configIndexName())
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setSource(sourceBuilder).request();

ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(tokens, allowNoDatafeeds);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest,
ActionListener.<SearchResponse>wrap(
response -> {
List<DatafeedConfig.Builder> datafeeds = new ArrayList<>();
Set<String> datafeedIds = new HashSet<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
try {
BytesReference source = hit.getSourceRef();
DatafeedConfig.Builder datafeed = parseLenientlyFromSource(source);
datafeeds.add(datafeed);
datafeedIds.add(datafeed.getId());
} catch (IOException e) {
// TODO A better way to handle this rather than just ignoring the error?
logger.error("Error parsing datafeed configuration [" + hit.getId() + "]", e);
}
}

requiredMatches.filterMatchedIds(datafeedIds);
if (requiredMatches.hasUnmatchedIds()) {
// some required datafeeds were not found
listener.onFailure(ExceptionsHelper.missingDatafeedException(requiredMatches.unmatchedIdsString()));
return;
}

listener.onResponse(datafeeds);
},
listener::onFailure)
, client::search);

}

private QueryBuilder buildQuery(String [] tokens) {
QueryBuilder jobQuery = new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE);
if (ExpandedIdsMatcher.isWildcardAll(tokens)) {
// match all
return jobQuery;
}

BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.filter(jobQuery);
BoolQueryBuilder shouldQueries = new BoolQueryBuilder();

List<String> terms = new ArrayList<>();
for (String token : tokens) {
if (Regex.isSimpleMatchPattern(token)) {
shouldQueries.should(new WildcardQueryBuilder(DatafeedConfig.ID.getPreferredName(), token));
} else {
terms.add(token);
}
}

if (terms.isEmpty() == false) {
shouldQueries.should(new TermsQueryBuilder(DatafeedConfig.ID.getPreferredName(), terms));
}

if (shouldQueries.should().isEmpty() == false) {
boolQueryBuilder.filter(shouldQueries);
}

return boolQueryBuilder;
}

private void parseLenientlyFromSource(BytesReference source, ActionListener<DatafeedConfig.Builder> datafeedConfigListener) {
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
Expand Down
Loading