Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better names and types for ingest stats #93533

Merged
merged 6 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/93533.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 93533
summary: Better names and types for ingest stats
area: Ingest Node
type: bug
issues:
- 80763
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testIngestStatsNamesAndTypes() throws IOException {

IngestStats.ProcessorStat setB = processorStats.get(1);
assertThat(setB.getName(), equalTo("set:set-b"));
assertThat(setB.getType(), equalTo("conditional"));
assertThat(setB.getType(), equalTo("set"));
assertThat(setB.getStats().getIngestCount(), equalTo(0L)); // see false_script above

IngestStats.ProcessorStat setC = processorStats.get(2);
Expand All @@ -138,8 +138,8 @@ public void testIngestStatsNamesAndTypes() throws IOException {
assertThat(setC.getStats().getIngestCount(), equalTo(1L));

IngestStats.ProcessorStat setD = processorStats.get(3);
assertThat(setD.getName(), equalTo("compound:CompoundProcessor-set-d"));
assertThat(setD.getType(), equalTo("conditional"));
assertThat(setD.getName(), equalTo("set:set-d"));
assertThat(setD.getType(), equalTo("set"));
assertThat(setD.getStats().getIngestCount(), equalTo(1L));
}

Expand All @@ -154,9 +154,7 @@ public void testIngestStatsNamesAndTypes() throws IOException {
Map<String, Object> stats = createParser(JsonXContent.jsonXContent, Strings.toString(builder)).map();

int setProcessorCount = path(stats, "nodes.ingest.processor_stats.set.count");
assertThat(setProcessorCount, equalTo(2));
int conditionalProcessorCount = path(stats, "nodes.ingest.processor_stats.conditional.count");
assertThat(conditionalProcessorCount, equalTo(1));
assertThat(setProcessorCount, equalTo(3));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ public static Processor readProcessor(
);
}
if (onFailureProcessors.size() > 0 || ignoreFailure) {
processor = new CompoundProcessor(ignoreFailure, List.of(processor), onFailureProcessors);
processor = new OnFailureProcessor(ignoreFailure, processor, onFailureProcessors);
}
if (conditionalScript != null) {
processor = new ConditionalProcessor(tag, description, conditionalScript, scriptService, processor);
Expand Down
30 changes: 24 additions & 6 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,13 @@ Map<String, PipelineHolder> pipelines() {
}

/**
* Recursive method to obtain all of the non-failure processors for given compoundProcessor. Since conditionals are implemented as
* wrappers to the actual processor, always prefer the actual processor's metric over the conditional processor's metric.
* Recursive method to obtain all the non-failure processors for given compoundProcessor.
* <p>
* 'if' and 'ignore_failure'/'on_failure' are implemented as wrappers around the actual processor (via {@link ConditionalProcessor}
* and {@link OnFailureProcessor}, respectively), so we unwrap these processors internally in order to expose the underlying
* 'actual' processor via the metrics. This corresponds best to the customer intent -- e.g. they used a 'set' processor that has an
* 'on_failure', so we report metrics for the set processor, not an on_failure processor.
*
* @param compoundProcessor The compound processor to start walking the non-failure processors
* @param processorMetrics The list of {@link Processor} {@link IngestMetric} tuples.
* @return the processorMetrics for all non-failure processor that belong to the original compoundProcessor
Expand All @@ -550,13 +555,26 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(
for (Tuple<Processor, IngestMetric> processorWithMetric : compoundProcessor.getProcessorsWithMetrics()) {
Processor processor = processorWithMetric.v1();
IngestMetric metric = processorWithMetric.v2();

// unwrap 'if' and 'ignore_failure/on_failure' wrapping, so that we expose the underlying actual processor
boolean unwrapped;
do {
unwrapped = false;
if (processor instanceof ConditionalProcessor conditional) {
processor = conditional.getInnerProcessor();
metric = conditional.getMetric(); // prefer the conditional's metric, it only covers when the conditional was true
unwrapped = true;
}
if (processor instanceof OnFailureProcessor onFailure) {
processor = onFailure.getInnerProcessor();
metric = onFailure.getInnerMetric(); // the wrapped processor records the failure count
unwrapped = true;
}
} while (unwrapped);

if (processor instanceof CompoundProcessor cp) {
getProcessorMetrics(cp, processorMetrics);
} else {
// Prefer the conditional's metric since it only includes metrics when the conditional evaluated to true.
if (processor instanceof ConditionalProcessor cp) {
metric = (cp.getMetric());
}
processorMetrics.add(new Tuple<>(processor, metric));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.core.Tuple;

import java.util.List;

/**
* A wrapping processor that adds failure handling logic around the wrapped processor.
*/
public class OnFailureProcessor extends CompoundProcessor implements WrappingProcessor {

static final String TYPE = "on_failure";

public OnFailureProcessor(boolean ignoreFailure, Processor processor, List<Processor> onFailureProcessors) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have any test coverage (in OnFailureProcessorTests or IngestStatsNamesAndTypesIT) for when onFailureProcessors is not empty, do we? Might also be worth documenting that that is non-empty if there is an on_failure block, and empty if ignore_failure = true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out into its own PR, see #93573.

super(ignoreFailure, List.of(processor), onFailureProcessors);
}

@Override
public Processor getInnerProcessor() {
List<Processor> processors = this.getProcessors();
assert processors.size() == 1;
return processors.get(0);
}

IngestMetric getInnerMetric() {
List<Tuple<Processor, IngestMetric>> processorsAndMetrics = this.getProcessorsWithMetrics();
assert processorsAndMetrics.size() == 1;
return processorsAndMetrics.get(0).v2();
}

@Override
public String getType() {
return TYPE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest;

import org.elasticsearch.test.ESTestCase;

import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;

public class OnFailureProcessorTests extends ESTestCase {

public void testOnFailureProcessor() {
TestProcessor processor = new TestProcessor(new RuntimeException("error"));
OnFailureProcessor onFailureProcessor = new OnFailureProcessor(false, processor, List.of());

assertThat(onFailureProcessor.isIgnoreFailure(), is(false));

assertThat(onFailureProcessor.getProcessors().size(), equalTo(1));
assertThat(onFailureProcessor.getProcessors().get(0), sameInstance(processor));
assertThat(onFailureProcessor.getInnerProcessor(), sameInstance(processor));

assertThat(onFailureProcessor.getOnFailureProcessors().isEmpty(), is(true));

assertThat(onFailureProcessor.getInnerMetric(), sameInstance(onFailureProcessor.getProcessorsWithMetrics().get(0).v2()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception {
assertThat(pipeline.getDescription(), equalTo("_description"));
assertThat(pipeline.getVersion(), equalTo(version));
assertThat(pipeline.getProcessors().size(), equalTo(1));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("compound"));
assertThat(pipeline.getProcessors().get(0).getType(), equalTo("on_failure"));
}

public void testFlattenProcessors() throws Exception {
Expand Down