From 539d6adfa3314f6ccbc52571001a1540a742f1d6 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 08:57:55 +0100 Subject: [PATCH 1/6] Invoke default pipeline of new index --- docs/changelog/85931.yaml | 5 + .../elasticsearch/index/FinalPipelineIT.java | 64 +++++++++-- .../elasticsearch/ingest/IngestService.java | 106 +++++++++++++----- .../ingest/IngestServiceTests.java | 2 + 4 files changed, 138 insertions(+), 39 deletions(-) create mode 100644 docs/changelog/85931.yaml diff --git a/docs/changelog/85931.yaml b/docs/changelog/85931.yaml new file mode 100644 index 0000000000000..5ad9617b21f8f --- /dev/null +++ b/docs/changelog/85931.yaml @@ -0,0 +1,5 @@ +pr: 85931 +summary: Invoke default pipeline of new index +area: Ingest Node +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 98ce3933f9d3c..e9f56bd2ed599 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.Processor; @@ -49,6 +50,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -156,7 +158,7 @@ public void testFinalPipelineOfNewDestinationIsInvoked() { assertEquals(true, target.getHits().getAt(0).getSourceAsMap().get("final")); } - public void testDefaultPipelineOfNewDestinationIsNotInvoked() { + public void testDefaultPipelineOfNewDestinationIsInvoked() { Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); createIndex("index", settings); @@ -185,7 +187,39 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() { assertEquals(RestStatus.CREATED, indexResponse.status()); SearchResponse target = client().prepareSearch("target").get(); assertEquals(1, target.getHits().getTotalHits().value); - assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); + assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); + } + + public void testAvoidIndexingLoop() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"changing_dest": {"dest": "target"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"changing_dest": {"dest": "index"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index") + .setId("1") + .setSource(Map.of("dest", "index")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get() + ); + assertThat(exception.getMessage(), containsString("index cycle detected while processing pipelines: [index, target, index]")); } public void testFinalPipeline() { @@ -382,18 +416,24 @@ public String getType() { } }, "changing_dest", - (processorFactories, tag, description, config) -> new AbstractProcessor(tag, description) { - @Override - public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { - ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), "target"); - return ingestDocument; - } + (processorFactories, tag, description, config) -> { + final String dest = Objects.requireNonNullElse( + ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"), + "target" + ); + return new AbstractProcessor(tag, description) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), dest); + return ingestDocument; + } - @Override - public String getType() { - return "changing_dest"; - } + @Override + public String getType() { + return "changing_dest"; + } + }; } ); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 0e53b6a39f0fd..3907b338e66c9 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -68,6 +68,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -714,21 +715,8 @@ protected void doRun() { continue; } - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = List.of(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(finalPipelineId); - } else { + Pipelines pipelines = getPipelines(indexRequest); + if (pipelines.isEmpty()) { i++; continue; } @@ -763,8 +751,16 @@ public void onFailure(Exception e) { }); IngestDocument ingestDocument = newIngestDocument(indexRequest); - executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); - + LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); + indexRecursionDetection.add(indexRequest.index()); + executePipelines( + pipelines.iterator(), + pipelines.hasFinalPipeline(), + indexRequest, + ingestDocument, + documentListener, + indexRecursionDetection + ); i++; } } @@ -772,12 +768,63 @@ public void onFailure(Exception e) { }); } + private Pipelines getPipelines(IndexRequest indexRequest) { + indexRequest.isPipelineResolved(false); + resolvePipelines(null, indexRequest, state.metadata()); + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + return new Pipelines(pipelineId, finalPipelineId); + } + + private static class Pipelines implements Iterable { + private String defaultPipeline; + private String finalPipeline; + + private Pipelines(String defaultPipeline, String finalPipeline) { + if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + this.defaultPipeline = defaultPipeline; + } + if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) { + this.finalPipeline = finalPipeline; + } + } + + public boolean hasFinalPipeline() { + return finalPipeline != null; + } + + public boolean isEmpty() { + return defaultPipeline == null && finalPipeline == null; + } + + public void withoutDefaultPipeline() { + defaultPipeline = null; + } + + @Override + public Iterator iterator() { + if (defaultPipeline != null && finalPipeline != null) { + return List.of(defaultPipeline, finalPipeline).iterator(); + } + if (finalPipeline != null) { + return List.of(finalPipeline).iterator(); + } + if (defaultPipeline != null) { + return List.of(defaultPipeline).iterator(); + } + return Collections.emptyIterator(); + } + } + private void executePipelines( final Iterator pipelineIds, final boolean hasFinalPipeline, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener + final ActionListener listener, + final Set indexRecursionDetection ) { assert pipelineIds.hasNext(); final String pipelineId = pipelineIds.next(); @@ -840,6 +887,14 @@ private void executePipelines( final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { + if (indexRecursionDetection.add(newIndex) == false) { + List indexRoute = new ArrayList<>(indexRecursionDetection); + indexRoute.add(newIndex); + listener.onFailure( + new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute)) + ); + return; // document failed! + } if (hasFinalPipeline && pipelineIds.hasNext() == false) { listener.onFailure( new IllegalStateException( @@ -854,19 +909,16 @@ private void executePipelines( ); return; // document failed! } else { - indexRequest.isPipelineResolved(false); - resolvePipelines(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newPipelineIds = Collections.emptyIterator(); - } + // reset request pipeline that is set to _none that would override the default pipeline + indexRequest.setPipeline(null); + Pipelines pipelines = getPipelines(indexRequest); + newHasFinalPipeline = pipelines.hasFinalPipeline(); + newPipelineIds = pipelines.iterator(); } } if (newPipelineIds.hasNext()) { - executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); + executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 64c90aa1dc6c6..8a5f86955930a 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -168,6 +168,8 @@ public void testExecuteIndexPipelineDoesNotExist() { List.of(DUMMY_PLUGIN), client ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(Map.of()) .setPipeline("_id") From 4c65642b60382b7cba67a1572d313a0316d04485 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 10:46:52 +0100 Subject: [PATCH 2/6] Fix ingest yaml test --- .../main/java/org/elasticsearch/ingest/IngestService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 3907b338e66c9..5d4c6b3235167 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -769,8 +769,6 @@ public void onFailure(Exception e) { } private Pipelines getPipelines(IndexRequest indexRequest) { - indexRequest.isPipelineResolved(false); - resolvePipelines(null, indexRequest, state.metadata()); final String pipelineId = indexRequest.getPipeline(); indexRequest.setPipeline(NOOP_PIPELINE_NAME); final String finalPipelineId = indexRequest.getFinalPipeline(); @@ -909,8 +907,10 @@ private void executePipelines( ); return; // document failed! } else { - // reset request pipeline that is set to _none that would override the default pipeline + // reset request pipeline that is set to _none which would take precedence over the default pipeline indexRequest.setPipeline(null); + indexRequest.isPipelineResolved(false); + resolvePipelines(null, indexRequest, state.metadata()); Pipelines pipelines = getPipelines(indexRequest); newHasFinalPipeline = pipelines.hasFinalPipeline(); newPipelineIds = pipelines.iterator(); From ed6cf03d134821f6c919e86361dae2bac3a2bda0 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 12:03:02 +0100 Subject: [PATCH 3/6] Preserve previous behavior of _index and introduce _redirect --- .../elasticsearch/index/FinalPipelineIT.java | 54 +++++++++++++++++-- .../ingest/IngestDocMetadata.java | 2 + .../elasticsearch/ingest/IngestDocument.java | 1 + .../elasticsearch/ingest/IngestService.java | 7 +++ .../org/elasticsearch/script/Metadata.java | 9 ++++ 5 files changed, 69 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index e9f56bd2ed599..f70acb6bef6b5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -179,6 +179,38 @@ public void testDefaultPipelineOfNewDestinationIsInvoked() { .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) .actionGet(); + IndexResponse indexResponse = client().prepareIndex("index") + .setId("1") + .setSource(Map.of("field", "value")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + SearchResponse target = client().prepareSearch("target").get(); + assertEquals(1, target.getHits().getTotalHits().value); + assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); + } + + public void testDefaultPipelineOfRedirectDestinationIsInvoked() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"redirect": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"final": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + IndexResponse indexResponse = client().prepareIndex("index") .setId("1") .setSource(Map.of("field", "value")) @@ -198,14 +230,14 @@ public void testAvoidIndexingLoop() { createIndex("target", settings); BytesReference defaultPipelineBody = new BytesArray(""" - {"processors": [{"changing_dest": {"dest": "target"}}]}"""); + {"processors": [{"redirect": {"dest": "target"}}]}"""); client().admin() .cluster() .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) .actionGet(); BytesReference targetPipeline = new BytesArray(""" - {"processors": [{"changing_dest": {"dest": "index"}}]}"""); + {"processors": [{"redirect": {"dest": "index"}}]}"""); client().admin() .cluster() .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) @@ -416,6 +448,20 @@ public String getType() { } }, "changing_dest", + (processorFactories, tag, description, config) -> new AbstractProcessor(tag, description) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), "target"); + return ingestDocument; + } + + @Override + public String getType() { + return "changing_dest"; + } + + }, + "redirect", (processorFactories, tag, description, config) -> { final String dest = Objects.requireNonNullElse( ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"), @@ -424,13 +470,13 @@ public String getType() { return new AbstractProcessor(tag, description) { @Override public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { - ingestDocument.setFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), dest); + ingestDocument.setFieldValue(IngestDocument.Metadata.REDIRECT.getFieldName(), dest); return ingestDocument; } @Override public String getType() { - return "changing_dest"; + return "redirect"; } }; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java index dc78d58535e29..540715f293fd7 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java @@ -22,6 +22,8 @@ class IngestDocMetadata extends Metadata { static final Map> PROPERTIES = Map.of( INDEX, StringField.withWritable().withNullable(), + REDIRECT, + StringField.withWritable().withNullable(), ID, StringField.withWritable().withNullable(), ROUTING, diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index f471926087ae5..e3c4cf42c14cd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -905,6 +905,7 @@ public String toString() { public enum Metadata { INDEX(IndexFieldMapper.NAME), + REDIRECT("_redirect"), TYPE("_type"), ID(IdFieldMapper.NAME), ROUTING(RoutingFieldMapper.NAME), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 5d4c6b3235167..7c6df4ceb7a9c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -912,6 +912,9 @@ private void executePipelines( indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); Pipelines pipelines = getPipelines(indexRequest); + if (newIndex.equals(ingestDocument.getMetadata().getRedirect()) == false) { + pipelines.withoutDefaultPipeline(); + } newHasFinalPipeline = pipelines.hasFinalPipeline(); newPipelineIds = pipelines.iterator(); } @@ -1024,6 +1027,10 @@ private static void updateIndexRequestMetadata(final IndexRequest request, final // it's fine to set all metadata fields all the time, as ingest document holds their starting values // before ingestion, which might also get modified during ingestion. request.index(metadata.getIndex()); + String redirectIndex = metadata.getRedirect(); + if (redirectIndex != null) { + request.index(redirectIndex); + } request.id(metadata.getId()); request.routing(metadata.getRouting()); request.version(metadata.getVersion()); diff --git a/server/src/main/java/org/elasticsearch/script/Metadata.java b/server/src/main/java/org/elasticsearch/script/Metadata.java index 1afbdb63ca3a9..716dd54157655 100644 --- a/server/src/main/java/org/elasticsearch/script/Metadata.java +++ b/server/src/main/java/org/elasticsearch/script/Metadata.java @@ -39,6 +39,7 @@ */ public class Metadata { protected static final String INDEX = "_index"; + protected static final String REDIRECT = "_redirect"; protected static final String ID = "_id"; protected static final String ROUTING = "_routing"; protected static final String VERSION_TYPE = "_version_type"; @@ -118,6 +119,14 @@ public void setIndex(String index) { put(INDEX, index); } + public String getRedirect() { + return getString(REDIRECT); + } + + public void setRedirect(String redirect) { + put(REDIRECT, redirect); + } + public String getId() { return getString(ID); } From cfd5bd359eed5d43ad9dacb62d45e8092887bb31 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 19:46:40 +0100 Subject: [PATCH 4/6] Convert _redirect metadata to IngestDocument#redirect method --- .../org/elasticsearch/index/FinalPipelineIT.java | 2 +- .../org/elasticsearch/ingest/IngestDocMetadata.java | 2 -- .../org/elasticsearch/ingest/IngestDocument.java | 12 +++++++++++- .../java/org/elasticsearch/ingest/IngestService.java | 6 +----- .../main/java/org/elasticsearch/script/Metadata.java | 9 --------- .../org/elasticsearch/ingest/IngestServiceTests.java | 2 -- 6 files changed, 13 insertions(+), 20 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index f70acb6bef6b5..0b26a6fd54c43 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -470,7 +470,7 @@ public String getType() { return new AbstractProcessor(tag, description) { @Override public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { - ingestDocument.setFieldValue(IngestDocument.Metadata.REDIRECT.getFieldName(), dest); + ingestDocument.redirect(dest); return ingestDocument; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java index 540715f293fd7..dc78d58535e29 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocMetadata.java @@ -22,8 +22,6 @@ class IngestDocMetadata extends Metadata { static final Map> PROPERTIES = Map.of( INDEX, StringField.withWritable().withNullable(), - REDIRECT, - StringField.withWritable().withNullable(), ID, StringField.withWritable().withNullable(), ROUTING, diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index e3c4cf42c14cd..7d8b70087ee1c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -64,6 +64,7 @@ public final class IngestDocument { private final Set executedPipelines = new LinkedHashSet<>(); private boolean doNoSelfReferencesCheck = false; + private boolean invokeDefaultPipelineOfDestination = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -80,6 +81,7 @@ public IngestDocument(IngestDocument other) { new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); + this.invokeDefaultPipelineOfDestination = other.invokeDefaultPipelineOfDestination; } /** @@ -903,9 +905,17 @@ public String toString() { return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}'; } + public void redirect(String destIndex) { + getMetadata().setIndex(destIndex); + this.invokeDefaultPipelineOfDestination = true; + } + + public boolean isInvokeDefaultPipelineOfDestination() { + return invokeDefaultPipelineOfDestination; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), - REDIRECT("_redirect"), TYPE("_type"), ID(IdFieldMapper.NAME), ROUTING(RoutingFieldMapper.NAME), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 7c6df4ceb7a9c..9873fd997fd39 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -912,7 +912,7 @@ private void executePipelines( indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); Pipelines pipelines = getPipelines(indexRequest); - if (newIndex.equals(ingestDocument.getMetadata().getRedirect()) == false) { + if (ingestDocument.isInvokeDefaultPipelineOfDestination() == false) { pipelines.withoutDefaultPipeline(); } newHasFinalPipeline = pipelines.hasFinalPipeline(); @@ -1027,10 +1027,6 @@ private static void updateIndexRequestMetadata(final IndexRequest request, final // it's fine to set all metadata fields all the time, as ingest document holds their starting values // before ingestion, which might also get modified during ingestion. request.index(metadata.getIndex()); - String redirectIndex = metadata.getRedirect(); - if (redirectIndex != null) { - request.index(redirectIndex); - } request.id(metadata.getId()); request.routing(metadata.getRouting()); request.version(metadata.getVersion()); diff --git a/server/src/main/java/org/elasticsearch/script/Metadata.java b/server/src/main/java/org/elasticsearch/script/Metadata.java index 716dd54157655..1afbdb63ca3a9 100644 --- a/server/src/main/java/org/elasticsearch/script/Metadata.java +++ b/server/src/main/java/org/elasticsearch/script/Metadata.java @@ -39,7 +39,6 @@ */ public class Metadata { protected static final String INDEX = "_index"; - protected static final String REDIRECT = "_redirect"; protected static final String ID = "_id"; protected static final String ROUTING = "_routing"; protected static final String VERSION_TYPE = "_version_type"; @@ -119,14 +118,6 @@ public void setIndex(String index) { put(INDEX, index); } - public String getRedirect() { - return getString(REDIRECT); - } - - public void setRedirect(String redirect) { - put(REDIRECT, redirect); - } - public String getId() { return getString(ID); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 8a5f86955930a..64c90aa1dc6c6 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -168,8 +168,6 @@ public void testExecuteIndexPipelineDoesNotExist() { List.of(DUMMY_PLUGIN), client ); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(Map.of()) .setPipeline("_id") From ed8df5f0328f74154126d9c78eb95b71942580f6 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Tue, 21 Feb 2023 19:48:55 +0100 Subject: [PATCH 5/6] Fix test name --- .../java/org/elasticsearch/index/FinalPipelineIT.java | 2 +- .../src/main/java/org/elasticsearch/ingest/IngestDocument.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 0b26a6fd54c43..504d78eaf47c8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -158,7 +158,7 @@ public void testFinalPipelineOfNewDestinationIsInvoked() { assertEquals(true, target.getHits().getAt(0).getSourceAsMap().get("final")); } - public void testDefaultPipelineOfNewDestinationIsInvoked() { + public void testDefaultPipelineOfNewDestinationIsNotInvoked() { Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); createIndex("index", settings); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 7d8b70087ee1c..16d0e2af9bb1f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -907,7 +907,7 @@ public String toString() { public void redirect(String destIndex) { getMetadata().setIndex(destIndex); - this.invokeDefaultPipelineOfDestination = true; + invokeDefaultPipelineOfDestination = true; } public boolean isInvokeDefaultPipelineOfDestination() { From 25d82361a5d39bb5278c5acb15ce18661b2f3a3d Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 15 Apr 2022 15:04:59 +0200 Subject: [PATCH 6/6] Skip pipeline on redirect --- .../ingest/CompoundProcessor.java | 8 +- .../elasticsearch/ingest/IngestDocument.java | 7 ++ .../ingest/CompoundProcessorTests.java | 79 +++++++++++++++++++ 3 files changed, 91 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 2910ab11e8c94..459e6c084e798 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC Processor processor; IngestMetric metric; // iteratively execute any sync processors - while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) { + while (currentProcessor < processorsWithMetrics.size() + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isSkipCurrentPipeline() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index 16d0e2af9bb1f..cdbcc4d5e412b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,6 +62,7 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); + private boolean skipCurrentPipeline = false; private boolean doNoSelfReferencesCheck = false; private boolean invokeDefaultPipelineOfDestination = false; @@ -840,6 +841,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer { + skipCurrentPipeline = false; executedPipelines.remove(pipeline.getId()); if (previousPipeline != null) { ingestMetadata.put("pipeline", previousPipeline); @@ -908,12 +910,17 @@ public String toString() { public void redirect(String destIndex) { getMetadata().setIndex(destIndex); invokeDefaultPipelineOfDestination = true; + skipCurrentPipeline = true; } public boolean isInvokeDefaultPipelineOfDestination() { return invokeDefaultPipelineOfDestination; } + public boolean isSkipCurrentPipeline() { + return skipCurrentPipeline; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), TYPE("_type"), diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 4bc581594d8a4..09b8c206fc135 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -532,6 +532,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } } + public void testSkipPipeline() { + TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")); + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipAsyncProcessor() { + TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")) { + @Override + public boolean isAsync() { + return true; + } + }; + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipProcessorIgnoreFailure() { + TestProcessor processor1 = new TestProcessor(doc -> { + doc.redirect("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor processor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testDontSkipFailureProcessor() { + TestProcessor processor = new TestProcessor(doc -> { + doc.redirect("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); + TestProcessor failureProcessor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(failureProcessor1, failureProcessor2), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(failureProcessor1.getInvokedCounter(), equalTo(1)); + assertThat(failureProcessor2.getInvokedCounter(), equalTo(1)); + } + private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) { return new TestProcessor( tag,