Skip to content

Commit

Permalink
Add a terminate ingest processor (elastic#114157) (elastic#114343)
Browse files Browse the repository at this point in the history
This processor simply causes any remaining processors in the pipeline
to be skipped. It will normally be executed conditionally using the
`if` option. (If this pipeline is being called from another pipeline,
the calling pipeline is *not* terminated.)

For example, this:

```
POST /_ingest/pipeline/_simulate
{
  "pipeline":
  {
    "description": "Appends just 'before' to the steps field if the number field
 is present, or both 'before' and 'after' if not",
    "processors": [
      {
        "append": {
          "field": "steps",
          "value": "before"
        }
      },
      {
        "terminate": {
          "if": "ctx.error != null"
        }
      },
      {
        "append": {
          "field": "steps",
          "value": "after"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "doc1",
      "_source": {
        "name": "okay",
        "steps": []
      }
    },
    {
      "_index": "index",
      "_id": "doc2",
      "_source": {
        "name": "bad",
        "error": "oh no",
        "steps": []
      }
    }
  ]
}
```

returns something like this:

```
{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_version": "-3",
        "_id": "doc1",
        "_source": {
          "name": "okay",
          "steps": [
            "before",
            "after"
          ]
        },
        "_ingest": {
          "timestamp": "2024-10-04T16:25:20.448881Z"
        }
      }
    },
    {
      "doc": {
        "_index": "index",
        "_version": "-3",
        "_id": "doc2",
        "_source": {
          "name": "bad",
          "error": "oh no",
          "steps": [
            "before"
          ]
        },
        "_ingest": {
          "timestamp": "2024-10-04T16:25:20.448932Z"
        }
      }
    }
  ]
}
```
  • Loading branch information
PeteGillinElastic authored Oct 9, 2024
1 parent 6618c5d commit 6ec7a34
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 3 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/114157.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 114157
summary: Add a `terminate` ingest processor
area: Ingest Node
type: feature
issues:
- 110218
30 changes: 30 additions & 0 deletions docs/reference/ingest/processors/terminate.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[[terminate-processor]]
=== Terminate processor

++++
<titleabbrev>Terminate</titleabbrev>
++++

Terminates the current ingest pipeline, causing no further processors to be run.
This will normally be executed conditionally, using the `if` option.

If this pipeline is being called from another pipeline, the calling pipeline is *not* terminated.

[[terminate-options]]
.Terminate Options
[options="header"]
|======
| Name | Required | Default | Description
include::common-options.asciidoc[]
|======

[source,js]
--------------------------------------------------
{
"description" : "terminates the current pipeline if the error field is present",
"terminate": {
"if": "ctx.error != null"
}
}
--------------------------------------------------
// NOTCONSOLE
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)),
entry(SortProcessor.TYPE, new SortProcessor.Factory()),
entry(SplitProcessor.TYPE, new SplitProcessor.Factory()),
entry(TerminateProcessor.TYPE, new TerminateProcessor.Factory()),
entry(TrimProcessor.TYPE, new TrimProcessor.Factory()),
entry(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()),
entry(UppercaseProcessor.TYPE, new UppercaseProcessor.Factory()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;

import java.util.Map;

/**
* A {@link Processor} which simply prevents subsequent processors in the pipeline from running (without failing, like {@link FailProcessor}
* does). This will normally be run conditionally, using the {@code if} option.
*/
public class TerminateProcessor extends AbstractProcessor {

static final String TYPE = "terminate";

TerminateProcessor(String tag, String description) {
super(tag, description);
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) {
ingestDocument.terminate();
return ingestDocument;
}

@Override
public String getType() {
return TYPE;
}

public static final class Factory implements Processor.Factory {

@Override
public Processor create(
Map<String, Processor.Factory> processorFactories,
String tag,
String description,
Map<String, Object> config
) {
return new TerminateProcessor(tag, description);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest.common;

import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.ValueSource;
import org.elasticsearch.test.ESTestCase;

import java.util.Map;

import static org.elasticsearch.ingest.RandomDocumentPicks.randomIngestDocument;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

public class TerminateProcessorTests extends ESTestCase {

public void testTerminateInPipeline() throws Exception {
Pipeline pipeline = new Pipeline(
"my-pipeline",
null,
null,
null,
new CompoundProcessor(
new SetProcessor(
"before-set",
"Sets before field to true",
new TestTemplateService.MockTemplateScript.Factory("before"),
ValueSource.wrap(true, TestTemplateService.instance()),
null
),
new TerminateProcessor("terminate", "terminates the pipeline"),
new SetProcessor(
"after-set",
"Sets after field to true",
new TestTemplateService.MockTemplateScript.Factory("after"),
ValueSource.wrap(true, TestTemplateService.instance()),
null
)
)
);
IngestDocument input = randomIngestDocument(random(), Map.of("foo", "bar"));
PipelineOutput output = new PipelineOutput();

pipeline.execute(input, output::set);

assertThat(output.exception, nullValue());
// We expect the before-set processor to have run, but not the after-set one:
assertThat(output.document.getSource(), is(Map.of("foo", "bar", "before", true)));
}

private static class PipelineOutput {
IngestDocument document;
Exception exception;

void set(IngestDocument document, Exception exception) {
this.document = document;
this.exception = exception;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
---
setup:
- do:
ingest.put_pipeline:
id: "test-pipeline"
body: >
{
"description": "Appends just 'before' to the steps field if the number field is less than 50, or both 'before' and 'after' if not",
"processors": [
{
"append": {
"field": "steps",
"value": "before"
}
},
{
"terminate": {
"if": "ctx.number < 50"
}
},
{
"append": {
"field": "steps",
"value": "after"
}
}
]
}
- do:
ingest.put_pipeline:
id: "test-final-pipeline"
body: >
{
"description": "Appends 'final' to the steps field",
"processors": [
{
"append": {
"field": "steps",
"value": "final"
}
}
]
}
- do:
ingest.put_pipeline:
id: "test-outer-pipeline"
body: >
{
"description": "Runs test-pipeline and then append 'outer' to the steps field",
"processors": [
{
"pipeline": {
"name": "test-pipeline"
}
},
{
"append": {
"field": "steps",
"value": "outer"
}
}
]
}
- do:
indices.create:
index: "test-index-with-default-and-final-pipelines"
body:
settings:
index:
default_pipeline: "test-pipeline"
final_pipeline: "test-final-pipeline"
- do:
indices.create:
index: "test-vanilla-index"

---
teardown:
- do:
indices.delete:
index: "test-index-with-default-and-final-pipelines"
ignore_unavailable: true
- do:
indices.delete:
index: "test-vanilla-index"
ignore_unavailable: true
- do:
ingest.delete_pipeline:
id: "test-pipeline"
ignore: 404
- do:
ingest.delete_pipeline:
id: "test-outer-pipeline"
ignore: 404

---
"Test pipeline including conditional terminate pipeline":

- do:
bulk:
refresh: true
body:
- '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }'
- '{ "comment": "should terminate", "number": 40, "steps": [] }'
- '{ "index": {"_index": "test-index-with-default-and-final-pipelines" } }'
- '{ "comment": "should continue to end", "number": 60, "steps": [] }'

- do:
search:
rest_total_hits_as_int: true
index: "test-index-with-default-and-final-pipelines"
body:
sort: "number"
- match: { hits.total: 2 }
- match: { hits.hits.0._source.number: 40 }
- match: { hits.hits.1._source.number: 60 }
- match: { hits.hits.0._source.steps: ["before", "final"] }
- match: { hits.hits.1._source.steps: ["before", "after", "final"] }

---
"Test pipeline with terminate invoked from an outer pipeline":

- do:
bulk:
refresh: true
pipeline: "test-outer-pipeline"
body:
- '{ "index": {"_index": "test-vanilla-index" } }'
- '{ "comment": "should terminate inner pipeline but not outer", "number": 40, "steps": [] }'

- do:
search:
rest_total_hits_as_int: true
index: "test-vanilla-index"
body:
sort: "number"
- match: { hits.total: 1 }
- match: { hits.hits.0._source.number: 40 }
- match: { hits.hits.0._source.steps: ["before", "outer"] }
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) {
handler.accept(ingestDocument, null);
return;
}
Expand All @@ -159,7 +159,8 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
// iteratively execute any sync processors
while (currentProcessor < processorsWithMetrics.size()
&& processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
&& ingestDocument.isReroute() == false) {
&& ingestDocument.isReroute() == false
&& ingestDocument.isTerminate() == false) {
processorWithMetric = processorsWithMetrics.get(currentProcessor);
processor = processorWithMetric.v1();
metric = processorWithMetric.v2();
Expand All @@ -185,7 +186,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
}

assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isReroute() || ingestDocument.isTerminate()) {
handler.accept(ingestDocument, null);
return;
}
Expand Down
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public final class IngestDocument {

private boolean doNoSelfReferencesCheck = false;
private boolean reroute = false;
private boolean terminate = false;

public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map<String, Object> source) {
this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source);
Expand Down Expand Up @@ -935,6 +936,27 @@ void resetReroute() {
reroute = false;
}

/**
* Sets the terminate flag to true, to indicate that no further processors in the current pipeline should be run for this document.
*/
public void terminate() {
terminate = true;
}

/**
* Returns whether the {@link #terminate()} flag was set.
*/
boolean isTerminate() {
return terminate;
}

/**
* Resets the {@link #terminate()} flag.
*/
void resetTerminate() {
terminate = false;
}

public enum Metadata {
INDEX(IndexFieldMapper.NAME),
TYPE("_type"),
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
if (e != null) {
metrics.ingestFailed();
}
// Reset the terminate status now that pipeline execution is complete (if this was executed as part of another pipeline, the
// outer pipeline should continue):
ingestDocument.resetTerminate();
handler.accept(result, e);
});
}
Expand Down

0 comments on commit 6ec7a34

Please sign in to comment.