From 21c8a56a6e132c883d44d7242daf3d0e71b4ee4e Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 23 Jan 2023 12:33:41 -0500 Subject: [PATCH] Add an IngestService stats test (#93120) --- .../IngestAsyncProcessorIT.java} | 8 +- .../ingest/IngestServiceTests.java | 108 ++++++++++++++++++ 2 files changed, 111 insertions(+), 5 deletions(-) rename server/src/internalClusterTest/java/org/elasticsearch/{action/ingest/AsyncIngestProcessorIT.java => ingest/IngestAsyncProcessorIT.java} (96%) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java similarity index 96% rename from server/src/internalClusterTest/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java index 767c9f18bc24b..eac5c092b7ef1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/ingest/AsyncIngestProcessorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestAsyncProcessorIT.java @@ -5,13 +5,14 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -package org.elasticsearch.action.ingest; +package org.elasticsearch.ingest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -21,9 +22,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.ingest.AbstractProcessor; -import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.Processor; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesService; @@ -51,7 +49,7 @@ * executes asynchronously. The result of the operation should be the same and also the order in which the * bulk responses are returned should be the same as how the corresponding index requests were defined. */ -public class AsyncIngestProcessorIT extends ESSingleNodeTestCase { +public class IngestAsyncProcessorIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 59b957c8ab312..9a18c75a4bf52 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -56,6 +56,7 @@ import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; +import org.elasticsearch.script.TemplateScript; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.threadpool.ThreadPool; @@ -1389,6 +1390,113 @@ public void testBulkRequestExecution() throws Exception { } } + public void testIngestAndPipelineStats() throws Exception { + final Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock"); + when(processor.getTag()).thenReturn("mockTag"); + when(processor.isAsync()).thenReturn(true); + + // avoid returning null and dropping the document + doAnswer(args -> { + @SuppressWarnings("unchecked") + BiConsumer handler = (BiConsumer) args.getArguments()[1]; + handler.accept(RandomDocumentPicks.randomIngestDocument(random()), null); + return null; + }).when(processor).execute(any(IngestDocument.class), any()); + + // mock up an ingest service for returning a pipeline, this is used by the pipeline processor + final Pipeline[] pipelineToReturn = new Pipeline[1]; + final IngestService pipelineIngestService = mock(IngestService.class); + when(pipelineIngestService.getPipeline(anyString())).thenAnswer(inv -> pipelineToReturn[0]); + + IngestService ingestService = createWithProcessors( + Map.of( + "pipeline", + (factories, tag, description, config) -> new PipelineProcessor(tag, description, (params) -> new TemplateScript(params) { + @Override + public String execute() { + return "_id3"; + } // this pipeline processor will always execute the '_id3' processor + }, false, pipelineIngestService), + "mock", + (factories, tag, description, config) -> processor + ) + ); + + { + // all zeroes since nothing has executed + final IngestStats ingestStats = ingestService.stats(); + assertThat(ingestStats.getPipelineStats().size(), equalTo(0)); + assertStats(ingestStats.getTotalStats(), 0, 0, 0); + } + + // put some pipelines, and now there are pipeline and processor stats, too + PutPipelineRequest putRequest1 = new PutPipelineRequest( + "_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + // n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below + PutPipelineRequest putRequest2 = new PutPipelineRequest( + "_id2", + new BytesArray("{\"processors\": [{\"pipeline\" : {}}]}"), + XContentType.JSON + ); + PutPipelineRequest putRequest3 = new PutPipelineRequest( + "_id3", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), + XContentType.JSON + ); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = executePut(putRequest1, clusterState); + clusterState = executePut(putRequest2, clusterState); + clusterState = executePut(putRequest3, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + // hook up the mock ingest service to return pipeline3 when asked by the pipeline processor + pipelineToReturn[0] = ingestService.getPipeline("_id3"); + + { + final IngestStats ingestStats = ingestService.stats(); + assertThat(ingestStats.getPipelineStats().size(), equalTo(3)); + + // total + assertStats(ingestStats.getTotalStats(), 0, 0, 0); + // pipeline + assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 0, 0, 0); + assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 0, 0, 0); + assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 0, 0, 0); + // processor + assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0); + assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0); + assertProcessorStats(0, ingestStats, "_id3", 0, 0, 0); + } + + // put a single document through ingest processing + final IndexRequest indexRequest = new IndexRequest("_index"); + indexRequest.setPipeline("_id1").setFinalPipeline("_id2"); + indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); + ingestService.executeBulkRequest(1, List.of(indexRequest), indexReq -> {}, (integer, e) -> {}, (thread, e) -> {}, Names.WRITE); + + { + final IngestStats ingestStats = ingestService.stats(); + assertThat(ingestStats.getPipelineStats().size(), equalTo(3)); + + // total + // see https://github.com/elastic/elasticsearch/issues/92843 -- this should be 1, but it's actually 2 + // assertStats(ingestStats.getTotalStats(), 1, 0, 0); + // pipeline + assertPipelineStats(ingestStats.getPipelineStats(), "_id1", 1, 0, 0); + assertPipelineStats(ingestStats.getPipelineStats(), "_id2", 1, 0, 0); + assertPipelineStats(ingestStats.getPipelineStats(), "_id3", 1, 0, 0); + // processor + assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0); + assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0); + assertProcessorStats(0, ingestStats, "_id3", 1, 0, 0); + } + } + public void testStats() throws Exception { final Processor processor = mock(Processor.class); final Processor processorFailure = mock(Processor.class);