Skip to content

Commit

Permalink
Add an IngestService stats test (elastic#93120)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored Jan 23, 2023
1 parent 69914bf commit 21c8a56
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.ingest;
package org.elasticsearch.ingest;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand All @@ -21,9 +22,6 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -51,7 +49,7 @@
* executes asynchronously. The result of the operation should be the same and also the order in which the
* bulk responses are returned should be the same as how the corresponding index requests were defined.
*/
public class AsyncIngestProcessorIT extends ESSingleNodeTestCase {
public class IngestAsyncProcessorIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
Expand Down
108 changes: 108 additions & 0 deletions server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -1389,6 +1390,113 @@ public void testBulkRequestExecution() throws Exception {
}
}

public void testIngestAndPipelineStats() throws Exception {
final Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock");
when(processor.getTag()).thenReturn("mockTag");
when(processor.isAsync()).thenReturn(true);

// avoid returning null and dropping the document
doAnswer(args -> {
@SuppressWarnings("unchecked")
BiConsumer<IngestDocument, Exception> handler = (BiConsumer) args.getArguments()[1];
handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null);
return null;
}).when(processor).execute(any(IngestDocument.class), any());

// mock up an ingest service for returning a pipeline, this is used by the pipeline processor
final Pipeline[] pipelineToReturn = new Pipeline[1];
final IngestService pipelineIngestService = mock(IngestService.class);
when(pipelineIngestService.getPipeline(anyString())).thenAnswer(inv -> pipelineToReturn[0]);

IngestService ingestService = createWithProcessors(
Map.of(
"pipeline",
(factories, tag, description, config) -> new PipelineProcessor(tag, description, (params) -> new TemplateScript(params) {
@Override
public String execute() {
return "_id3";
} // this pipeline processor will always execute the '_id3' processor
}, false, pipelineIngestService),
"mock",
(factories, tag, description, config) -> processor
)
);

{
// all zeroes since nothing has executed
final IngestStats ingestStats = ingestService.stats();
assertThat(ingestStats.getPipelineStats().size(), equalTo(0));
assertStats(ingestStats.getTotalStats(), 0, 0, 0);
}

// put some pipelines, and now there are pipeline and processor stats, too
PutPipelineRequest putRequest1 = new PutPipelineRequest(
"_id1",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
XContentType.JSON
);
// n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below
PutPipelineRequest putRequest2 = new PutPipelineRequest(
"_id2",
new BytesArray("{\"processors\": [{\"pipeline\" : {}}]}"),
XContentType.JSON
);
PutPipelineRequest putRequest3 = new PutPipelineRequest(
"_id3",
new BytesArray("{\"processors\": [{\"mock\" : {}}]}"),
XContentType.JSON
);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = executePut(putRequest1, clusterState);
clusterState = executePut(putRequest2, clusterState);
clusterState = executePut(putRequest3, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));

// hook up the mock ingest service to return pipeline3 when asked by the pipeline processor
pipelineToReturn[0] = ingestService.getPipeline("_id3");

{
final IngestStats ingestStats = ingestService.stats();
assertThat(ingestStats.getPipelineStats().size(), equalTo(3));

// total
assertStats(ingestStats.getTotalStats(), 0, 0, 0);
// pipeline
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 0, 0, 0);
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 0, 0, 0);
assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 0, 0, 0);
// processor
assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0);
assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0);
assertProcessorStats(0, ingestStats, "_id3", 0, 0, 0);
}

// put a single document through ingest processing
final IndexRequest indexRequest = new IndexRequest("_index");
indexRequest.setPipeline("_id1").setFinalPipeline("_id2");
indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10));
ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE);

{
final IngestStats ingestStats = ingestService.stats();
assertThat(ingestStats.getPipelineStats().size(), equalTo(3));

// total
// see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2
// assertStats(ingestStats.getTotalStats(), 1, 0, 0);
// pipeline
assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0);
assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0);
assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 1, 0, 0);
// processor
assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0);
assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0);
assertProcessorStats(0, ingestStats, "_id3", 1, 0, 0);
}
}

public void testStats() throws Exception {
final Processor processor = mock(Processor.class);
final Processor processorFailure = mock(Processor.class);
Expand Down

0 comments on commit 21c8a56

Please sign in to comment.