Skip to content

Commit

Permalink
[7.x][ML] Add optional source filtering during data frame reindexing (#…
Browse files Browse the repository at this point in the history
…49690) (#49718)

This adds a `_source` setting under the `source` setting of a data
frame analytics config. The new `_source` is reusing the structure
of a `FetchSourceContext` like `analyzed_fields` does. Specifying
includes and excludes for source allows selecting which fields
will get reindexed and will be available in the destination index.

Closes #49531

Backport of #49690
  • Loading branch information
dimitris-athanasiou authored Nov 29, 2019
1 parent 813b49a commit 4edb2e7
Show file tree
Hide file tree
Showing 28 changed files with 525 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -44,20 +45,27 @@ public static Builder builder() {

private static final ParseField INDEX = new ParseField("index");
private static final ParseField QUERY = new ParseField("query");
public static final ParseField _SOURCE = new ParseField("_source");

private static ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_source", true, Builder::new);

static {
PARSER.declareStringArray(Builder::setIndex, INDEX);
PARSER.declareObject(Builder::setQueryConfig, (p, c) -> QueryConfig.fromXContent(p), QUERY);
PARSER.declareField(Builder::setSourceFiltering,
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
}

private final String[] index;
private final QueryConfig queryConfig;
private final FetchSourceContext sourceFiltering;

private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig) {
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) {
this.index = Objects.requireNonNull(index);
this.queryConfig = queryConfig;
this.sourceFiltering = sourceFiltering;
}

public String[] getIndex() {
Expand All @@ -68,13 +76,20 @@ public QueryConfig getQueryConfig() {
return queryConfig;
}

public FetchSourceContext getSourceFiltering() {
return sourceFiltering;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(INDEX.getPreferredName(), index);
if (queryConfig != null) {
builder.field(QUERY.getPreferredName(), queryConfig.getQuery());
}
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
builder.endObject();
return builder;
}
Expand All @@ -86,12 +101,13 @@ public boolean equals(Object o) {

DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryConfig, other.queryConfig);
&& Objects.equals(queryConfig, other.queryConfig)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
}

@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryConfig);
return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering);
}

@Override
Expand All @@ -103,6 +119,7 @@ public static class Builder {

private String[] index;
private QueryConfig queryConfig;
private FetchSourceContext sourceFiltering;

private Builder() {}

Expand All @@ -121,8 +138,13 @@ public Builder setQueryConfig(QueryConfig queryConfig) {
return this;
}

public Builder setSourceFiltering(FetchSourceContext sourceFiltering) {
this.sourceFiltering = sourceFiltering;
return this;
}

public DataFrameAnalyticsSource build() {
return new DataFrameAnalyticsSource(index, queryConfig);
return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2939,6 +2939,9 @@ public void testPutDataFrameAnalytics() throws Exception {
DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1>
.setIndex("put-test-source-index") // <2>
.setQueryConfig(queryConfig) // <3>
.setSourceFiltering(new FetchSourceContext(true,
new String[] { "included_field_1", "included_field_2" },
new String[] { "excluded_field" })) // <4>
.build();
// end::put-data-frame-analytics-source-config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractXContentTestCase;

import java.io.IOException;
Expand All @@ -35,9 +36,17 @@
public class DataFrameAnalyticsSourceTests extends AbstractXContentTestCase<DataFrameAnalyticsSource> {

public static DataFrameAnalyticsSource randomSourceConfig() {
FetchSourceContext sourceFiltering = null;
if (randomBoolean()) {
sourceFiltering = new FetchSourceContext(true,
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}

return DataFrameAnalyticsSource.builder()
.setIndex(generateRandomStringArray(10, 10, false, false))
.setQueryConfig(randomBoolean() ? null : randomQueryConfig())
.setSourceFiltering(sourceFiltering)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ include-tagged::{doc-tests-file}[{api}-source-config]
<1> Constructing a new DataFrameAnalyticsSource
<2> The source index
<3> The query from which to gather the data. If query is not set, a `match_all` query is used by default.
<4> Source filtering to select which fields will exist in the destination index.

===== QueryConfig

Expand Down
32 changes: 23 additions & 9 deletions docs/reference/ml/df-analytics/apis/dfanalyticsresources.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
<<dfanalytics-types>>.

`analyzed_fields`::
(object) You can specify both `includes` and/or `excludes` patterns. If
`analyzed_fields` is not set, only the relevant fields will be included. For
example, all the numeric fields for {oldetection}. For the supported field
types, see <<ml-put-dfanalytics-supported-fields>>.
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be included in the analysis. If `analyzed_fields` is not set,
only the relevant fields will be included. For example, all the numeric fields
for {oldetection}. For the supported field types, see <<ml-put-dfanalytics-supported-fields>>.
Also see the <<explain-dfanalytics>> which helps understand field selection.

`includes`:::
(array) An array of strings that defines the fields that will be included in
(Optional, array) An array of strings that defines the fields that will be included in
the analysis.

`excludes`:::
(array) An array of strings that defines the fields that will be excluded
(Optional, array) An array of strings that defines the fields that will be excluded
from the analysis.


Expand Down Expand Up @@ -81,8 +82,8 @@ PUT _ml/data_frame/analytics/loganalytics
that setting. For more information, see <<ml-settings>>.

`source`::
(object) The source configuration consisting an `index` and optionally a
`query` object.
(object) The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.

`index`:::
(Required, string or array) Index or indices on which to perform the
Expand All @@ -96,6 +97,19 @@ PUT _ml/data_frame/analytics/loganalytics
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.

`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be present in the destination. Fields that are excluded
cannot be included in the analysis.

`includes`::::
(array) An array of strings that defines the fields that will be included in
the destination.

`excludes`::::
(array) An array of strings that defines the fields that will be excluded
from the destination.

[[dfanalytics-types]]
==== Analysis objects

Expand Down Expand Up @@ -277,4 +291,4 @@ improvement. If you override any parameters, then the optimization will
calculate the value of the remaining parameters accordingly and use the value
you provided for the overridden parameter. The number of rounds are reduced
respectively. The validation error is estimated in each round by using 4-fold
cross validation.
cross validation.
53 changes: 33 additions & 20 deletions docs/reference/ml/df-analytics/apis/put-dfanalytics.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ single number. For example, in case of age ranges, you can model the values as
<<dfanalytics-types>>.

`analyzed_fields`::
(Optional, object) You can specify both `includes` and/or `excludes` patterns.
If `analyzed_fields` is not set, only the relevant fields will be included.
For example, all the numeric fields for {oldetection}. For the supported field
types, see <<ml-put-dfanalytics-supported-fields>>. If you specify fields –
either in `includes` or in `excludes` – that have a data type that is not
supported, an error occurs.
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be included in the analysis. If `analyzed_fields` is not set,
only the relevant fields will be included. For example, all the numeric fields
for {oldetection}. For the supported field types, see <<ml-put-dfanalytics-supported-fields>>.
Also see the <<explain-dfanalytics>> which helps understand
field selection.

`includes`:::
(Optional, array) An array of strings that defines the fields that will be
included in the analysis.
Expand Down Expand Up @@ -142,20 +142,33 @@ single number. For example, in case of age ranges, you can model the values as
that setting. For more information, see <<ml-settings>>.

`source`::
(Required, object) The source configuration, consisting of `index` and
optionally a `query`.
(object) The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.

`index`:::
(Required, string or array) Index or indices on which to perform the
analysis. It can be a single index or index pattern as well as an array of
indices or patterns.

`query`:::
(Optional, object) The {es} query domain-specific language
(<<query-dsl,DSL>>). This value corresponds to the query object in an {es}
search POST body. All the options that are supported by {es} can be used,
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.
`index`:::
(Required, string or array) Index or indices on which to perform the
analysis. It can be a single index or index pattern as well as an array of
indices or patterns.

`query`:::
(Optional, object) The {es} query domain-specific language
(<<query-dsl,DSL>>). This value corresponds to the query object in an {es}
search POST body. All the options that are supported by {es} can be used,
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.

`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be present in the destination. Fields that are excluded
cannot be included in the analysis.

`includes`::::
(array) An array of strings that defines the fields that will be included in
the destination.

`excludes`::::
(array) An array of strings that defines the fields that will be excluded
from the destination.

`allow_lazy_start`::
(Optional, boolean) Whether this job should be allowed to start when there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
Expand All @@ -18,6 +19,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;

import java.io.IOException;
Expand Down Expand Up @@ -87,6 +89,24 @@ public DataFrameAnalyticsConfig getConfig() {

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException error = null;
error = checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(config, error);
return error;
}

private ActionRequestValidationException checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(
DataFrameAnalyticsConfig config, ActionRequestValidationException error) {
if (config.getAnalyzedFields() == null) {
return null;
}
for (String analyzedInclude : config.getAnalyzedFields().includes()) {
if (config.getSource().isFieldExcluded(analyzedInclude)) {
return ValidateActions.addValidationError("field [" + analyzedInclude + "] is included in ["
+ DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName() + "] but not in ["
+ DataFrameAnalyticsConfig.SOURCE.getPreferredName() + "."
+ DataFrameAnalyticsSource._SOURCE.getPreferredName() + "]", error);
}
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private static DataFrameAnalysis parseAnalysis(XContentParser parser, boolean ig
private final Version version;
private final boolean allowLazyStart;

public DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
DataFrameAnalysis analysis, Map<String, String> headers, ByteSizeValue modelMemoryLimit,
FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) {
this.id = ExceptionsHelper.requireNonNull(id, ID);
Expand Down
Loading

0 comments on commit 4edb2e7

Please sign in to comment.