Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add templating support to enrich processor #49093

Merged
merged 3 commits into from
Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/reference/ingest/processors/enrich.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ See <<ingest-enriching-data,enrich data>> section for more information about how
|======
| Name | Required | Default | Description
| `policy_name` | yes | - | The name of the enrich policy to use.
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data.
| `target_field` | yes | - | The field that will be used for the enrichment data.
| `field` | yes | - | The field in the input document that matches the policies match_field used to retrieve the enrichment data. Supports <<accessing-template-fields,template snippets>>.
| `target_field` | yes | - | The field that will be used for the enrichment data. Supports <<accessing-template-fields,template snippets>>.
| `ignore_missing` | no | false | If `true` and `field` does not exist, the processor quietly exits without modifying the document
| `override` | no | true | If processor will update fields with pre-existing non-null-valued field. When set to `false`, such fields will not be touched.
| `max_matches` | no | 1 | The maximum number of matched documents to include under the configured target field. The `target_field` will be turned into a json array if `max_matches` is higher than 1, otherwise `target_field` will become a json object. In order to avoid documents getting too large, the maximum allowed value is 128.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ public static List<Processor> readProcessorConfigs(List<Map<String, Object>> pro
return processors;
}

public static TemplateScript.Factory readTemplateProperty(String processorType, String processorTag, Map<String, Object> configuration,
String propertyName, ScriptService scriptService) {
String value = readStringProperty(processorType, processorTag, configuration, propertyName, null);
return compileTemplate(processorType, processorTag, propertyName, value, scriptService);
}

public static TemplateScript.Factory compileTemplate(String processorType, String processorTag, String propertyName,
String propertyValue, ScriptService scriptService) {
try {
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/enrich/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testCompile project(path: xpackModule('core'), configuration: 'testArtifacts')
testCompile project(path: ':modules:ingest-common')
testCompile project(path: ':modules:lang-mustache')
testCompile project(path: xpackModule('monitoring'), configuration: 'testArtifacts')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
Expand All @@ -28,8 +29,8 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {

private final String policyName;
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final String field;
private final String targetField;
private final TemplateScript.Factory field;
private final TemplateScript.Factory targetField;
private final boolean ignoreMissing;
private final boolean overrideEnabled;
protected final String matchField;
Expand All @@ -39,8 +40,8 @@ protected AbstractEnrichProcessor(
String tag,
Client client,
String policyName,
String field,
String targetField,
TemplateScript.Factory field,
TemplateScript.Factory targetField,
boolean ignoreMissing,
boolean overrideEnabled,
String matchField,
Expand All @@ -53,8 +54,8 @@ protected AbstractEnrichProcessor(
String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
TemplateScript.Factory field,
TemplateScript.Factory targetField,
boolean ignoreMissing,
boolean overrideEnabled,
String matchField,
Expand All @@ -77,6 +78,7 @@ protected AbstractEnrichProcessor(
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
try {
// If a document does not have the enrich key, return the unchanged document
String field = ingestDocument.renderTemplate(this.field);
final Object value = ingestDocument.getFieldValue(field, Object.class, ignoreMissing);
if (value == null) {
handler.accept(ingestDocument, null);
Expand Down Expand Up @@ -111,6 +113,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
return;
}

String targetField = ingestDocument.renderTemplate(this.targetField);
if (overrideEnabled || ingestDocument.hasField(targetField) == false) {
if (maxMatches == 1) {
Map<String, Object> firstDocument = searchHits[0].getSourceAsMap();
Expand Down Expand Up @@ -146,11 +149,13 @@ public String getType() {
}

String getField() {
return field;
// used for testing only:
return field.newInstance(Map.of()).execute();
}

public String getTargetField() {
return targetField;
String getTargetField() {
// used for testing only:
return targetField.newInstance(Map.of()).execute();
}

boolean isIgnoreMissing() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
return Map.of();
}

EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client);
EnrichProcessorFactory factory = new EnrichProcessorFactory(parameters.client, parameters.scriptService);
parameters.ingestService.addIngestClusterStateListener(factory);
return Map.of(EnrichProcessorFactory.TYPE, factory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import java.util.Map;
Expand All @@ -23,11 +25,13 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer<Cluste

static final String TYPE = "enrich";
private final Client client;
private final ScriptService scriptService;

volatile MetaData metaData;

EnrichProcessorFactory(Client client) {
EnrichProcessorFactory(Client client, ScriptService scriptService) {
this.client = client;
this.scriptService = scriptService;
}

@Override
Expand All @@ -42,17 +46,17 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
assert aliasOrIndex.getIndices().size() == 1;
IndexMetaData imd = aliasOrIndex.getIndices().get(0);

String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
String policyType = (String) XContentMapValues.extractValue(
"_meta." + EnrichPolicyRunner.ENRICH_POLICY_TYPE_FIELD_NAME,
mappingAsMap
);
String matchField = (String) XContentMapValues.extractValue("_meta." + EnrichPolicyRunner.ENRICH_MATCH_FIELD_NAME, mappingAsMap);

TemplateScript.Factory field = ConfigurationUtils.readTemplateProperty(TYPE, tag, config, "field", scriptService);
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override", true);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
TemplateScript.Factory targetField = ConfigurationUtils.readTemplateProperty(TYPE, tag, config, "target_field", scriptService);
int maxMatches = ConfigurationUtils.readIntProperty(TYPE, tag, config, "max_matches", 1);
if (maxMatches < 1 || maxMatches > 128) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.query.GeoShapeQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.script.TemplateScript;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,8 +30,8 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor {
String tag,
Client client,
String policyName,
String field,
String targetField,
TemplateScript.Factory field,
TemplateScript.Factory targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
Expand All @@ -46,8 +47,8 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor {
String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
TemplateScript.Factory field,
TemplateScript.Factory targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.script.TemplateScript;

import java.util.List;
import java.util.function.BiConsumer;
Expand All @@ -21,8 +22,8 @@ public class MatchProcessor extends AbstractEnrichProcessor {
String tag,
Client client,
String policyName,
String field,
String targetField,
TemplateScript.Factory field,
TemplateScript.Factory targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
Expand All @@ -36,8 +37,8 @@ public class MatchProcessor extends AbstractEnrichProcessor {
String tag,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
String policyName,
String field,
String targetField,
TemplateScript.Factory field,
TemplateScript.Factory targetField,
boolean overrideEnabled,
boolean ignoreMissing,
String matchField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class BasicEnrichTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class, MustachePlugin.class);
}

@Override
Expand Down Expand Up @@ -297,6 +298,43 @@ public void testAsyncTaskExecute() throws Exception {
}
}

public void testTemplating() throws Exception {
List<String> keys = createSourceMatchIndex(1, 1);
String policyName = "my-policy";
EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
List.of(SOURCE_INDEX_NAME),
MATCH_FIELD,
List.of(DECORATE_FIELDS)
);
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();

String pipelineName = "my-pipeline";
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\""
+ policyName
+ "\", \"field\": \"{{indirection1}}\", \"target_field\": \"{{indirection2}}\""
+ "}}]}";
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();

IndexRequest indexRequest = new IndexRequest("my-index").id("1")
.setPipeline(pipelineName)
.source(Map.of("indirection1", MATCH_FIELD, "indirection2", "users", MATCH_FIELD, keys.get(0)));
client().index(indexRequest).get();
GetResponse getResponse = client().get(new GetRequest("my-index", "1")).actionGet();
Map<String, Object> source = getResponse.getSourceAsMap();
Map<?, ?> userEntry = (Map<?, ?>) source.get("users");
assertThat(userEntry.size(), equalTo(DECORATE_FIELDS.length + 1));
for (int j = 0; j < 3; j++) {
String field = DECORATE_FIELDS[j];
assertThat(userEntry.get(field), equalTo(keys.get(0) + j));
}
assertThat(keys.contains(userEntry.get(MATCH_FIELD)), is(true));
}

private List<String> createSourceMatchIndex(int numKeys, int numDocsPerKey) {
Set<String> keys = new HashSet<>();
for (int id = 0; id < numKeys; id++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -25,13 +27,21 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;

public class EnrichProcessorFactoryTests extends ESTestCase {

private ScriptService scriptService;

@Before
public void initializeScriptService() {
scriptService = mock(ScriptService.class);
}

public void testCreateProcessorInstance() throws Exception {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config = new HashMap<>();
Expand Down Expand Up @@ -81,7 +91,7 @@ public void testCreateProcessorInstance() throws Exception {

public void testPolicyDoesNotExist() {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
factory.metaData = MetaData.builder().build();

Map<String, Object> config = new HashMap<>();
Expand Down Expand Up @@ -110,7 +120,7 @@ public void testPolicyDoesNotExist() {

public void testPolicyNameMissing() {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);

Map<String, Object> config = new HashMap<>();
config.put("enrich_key", "host");
Expand Down Expand Up @@ -138,7 +148,7 @@ public void testPolicyNameMissing() {
public void testUnsupportedPolicy() throws Exception {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy("unsupported", null, List.of("source_index"), "my_key", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config = new HashMap<>();
Expand All @@ -157,7 +167,7 @@ public void testUnsupportedPolicy() throws Exception {
public void testCompactEnrichValuesFormat() throws Exception {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config = new HashMap<>();
Expand All @@ -175,7 +185,7 @@ public void testCompactEnrichValuesFormat() throws Exception {
public void testNoTargetField() throws Exception {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "host", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config1 = new HashMap<>();
Expand All @@ -189,7 +199,7 @@ public void testNoTargetField() throws Exception {
public void testIllegalMaxMatches() throws Exception {
List<String> enrichValues = List.of("globalRank", "tldRank", "tld");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of("source_index"), "my_key", enrichValues);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null);
EnrichProcessorFactory factory = new EnrichProcessorFactory(null, scriptService);
factory.metaData = createMetaData("majestic", policy);

Map<String, Object> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.function.BiConsumer;

import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -66,8 +67,8 @@ private void testBasicsForFieldValue(Object fieldValue, Geometry expectedGeometr
"_tag",
mockSearch,
"_name",
"location",
"entry",
str("location"),
str("entry"),
false,
false,
"shape",
Expand Down
Loading