diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index 9ed745d3e0a79..e2d00834bd57d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -160,6 +160,8 @@ void transformRequest(SearchRequest request, ActionListener reque long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterRequestProcessor(processor, took); onRequestProcessorFailed(processor); + // When verbosePipeline is enabled, all processor failures are ignored to ensure the execution chain continues without + // interruption.TrackingSearchResponseProcessorWrapper will log all errors in detail for debugging purposes if (processor.isIgnoreFailure() || r.source().verbosePipeline()) { logger.warn( "The exception from request processor [" @@ -239,7 +241,8 @@ ActionListener transformResponseListener( onResponseProcessorFailed(processor); long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); afterResponseProcessor(processor, took); - // If an error occurs and the pipeline is in verbose mode, the processor will not terminate the execution chain. + // When verbosePipeline is enabled, all processor failures are ignored to ensure the execution chain continues without + // interruption.TrackingSearchResponseProcessorWrapper will log all errors in detail for debugging purposes if (processor.isIgnoreFailure() || request.source().verbosePipeline()) { logger.warn( "The exception from response processor [" diff --git a/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapper.java b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapper.java index 91296df914410..459d25a02bc8e 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapper.java +++ b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapper.java @@ -56,20 +56,9 @@ public SearchRequest processRequest(SearchRequest request) throws Exception { @Override public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception { - ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag()); - - long start = System.nanoTime(); try { - detail.addInput(request.source().toString()); - SearchRequest result = wrappedProcessor.processRequest(request, requestContext); - detail.addOutput(result.source().toString()); - long took = System.nanoTime() - start; - detail.addTook(took); - requestContext.addProcessorExecutionDetail(detail); - return result; - } catch (Exception e) { - detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); - requestContext.addProcessorExecutionDetail(detail); + return wrappedProcessor.processRequest(request, requestContext); + } catch (UnsupportedOperationException e) { throw e; } } @@ -81,26 +70,18 @@ public void processRequestAsync( ActionListener requestListener ) { ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag()); - long start = System.nanoTime(); - try { - detail.addInput(request.source().toString()); - wrappedProcessor.processRequestAsync(request, requestContext, ActionListener.wrap(result -> { - detail.addOutput(result.source().toString()); - long took = System.nanoTime() - start; - detail.addTook(took); - requestContext.addProcessorExecutionDetail(detail); - requestListener.onResponse(result); - }, e -> { - detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); - requestContext.addProcessorExecutionDetail(detail); - requestListener.onFailure(e); - })); - } catch (Exception e) { + detail.addInput(request.source().toString()); + wrappedProcessor.processRequestAsync(request, requestContext, ActionListener.wrap(result -> { + detail.addOutput(result.source().toString()); + long took = System.nanoTime() - start; + detail.addTook(took); + requestContext.addProcessorExecutionDetail(detail); + requestListener.onResponse(result); + }, e -> { detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); requestContext.addProcessorExecutionDetail(detail); requestListener.onFailure(e); - } + })); } - } diff --git a/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapper.java b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapper.java index bd7734f3497e4..93a17f4ba3f1d 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapper.java +++ b/server/src/main/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapper.java @@ -12,6 +12,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.core.action.ActionListener; +import java.io.IOException; import java.util.Arrays; /** @@ -63,20 +64,9 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp @Override public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext) throws Exception { - ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag()); - - long start = System.nanoTime(); try { - detail.addInput(Arrays.asList(response.getHits().deepCopy().getHits())); - SearchResponse result = wrappedProcessor.processResponse(request, response, requestContext); - detail.addOutput(Arrays.asList(result.getHits().deepCopy().getHits())); - long took = System.nanoTime() - start; - detail.addTook(took); - requestContext.addProcessorExecutionDetail(detail); - return result; - } catch (Exception e) { - detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); - requestContext.addProcessorExecutionDetail(detail); + return wrappedProcessor.processResponse(request, response, requestContext); + } catch (UnsupportedOperationException e) { throw e; } } @@ -89,25 +79,23 @@ public void processResponseAsync( ActionListener responseListener ) { ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag()); - long start = System.nanoTime(); try { detail.addInput(Arrays.asList(response.getHits().deepCopy().getHits())); - wrappedProcessor.processResponseAsync(request, response, requestContext, ActionListener.wrap(result -> { - detail.addOutput(Arrays.asList(result.getHits().deepCopy().getHits())); - long took = System.nanoTime() - start; - detail.addTook(took); - requestContext.addProcessorExecutionDetail(detail); - responseListener.onResponse(result); - }, e -> { - detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); - requestContext.addProcessorExecutionDetail(detail); - responseListener.onFailure(e); - })); - } catch (Exception e) { + } catch (IOException e) { + throw new RuntimeException(e); + } + wrappedProcessor.processResponseAsync(request, response, requestContext, ActionListener.wrap(result -> { + detail.addOutput(Arrays.asList(result.getHits().deepCopy().getHits())); + long took = System.nanoTime() - start; + detail.addTook(took); + requestContext.addProcessorExecutionDetail(detail); + responseListener.onResponse(result); + }, e -> { detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage()); requestContext.addProcessorExecutionDetail(detail); responseListener.onFailure(e); - } + })); } + } diff --git a/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapperTests.java b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapperTests.java index c1a198a35f7a0..fcc595ad7f55d 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapperTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchRequestProcessorWrapperTests.java @@ -36,30 +36,26 @@ public void setUp() throws Exception { context = new PipelineProcessingContext(); } - public void testProcessRequest() throws Exception { + public void testProcessRequestSuccess() throws Exception { SearchRequest inputRequest = new SearchRequest(); inputRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); SearchRequest outputRequest = new SearchRequest(); outputRequest.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value"))); + when(mockProcessor.processRequest(any(SearchRequest.class), eq(context))).thenReturn(outputRequest); SearchRequest result = wrapper.processRequest(inputRequest, context); assertEquals(outputRequest, result); - verify(mockProcessor).processRequest(inputRequest, context); - assertFalse(context.getProcessorExecutionDetails().isEmpty()); - ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0); - assertEquals(wrapper.getType(), detail.getProcessorName()); - assertNotNull(detail.getInputData()); - assertNotNull(detail.getOutputData()); - assertEquals(ProcessorExecutionDetail.ProcessorStatus.SUCCESS, detail.getStatus()); + verify(mockProcessor).processRequest(inputRequest, context); } public void testProcessRequestException() throws Exception { SearchRequest inputRequest = new SearchRequest(); inputRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())); - Exception processorException = new RuntimeException("Processor failed"); + + RuntimeException processorException = new UnsupportedOperationException("Processor failed"); when(mockProcessor.processRequest(any(SearchRequest.class), eq(context))).thenThrow(processorException); @@ -69,10 +65,6 @@ public void testProcessRequestException() throws Exception { } catch (Exception e) { assertEquals("Processor failed", e.getMessage()); } - - ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0); - assertEquals(wrapper.getType(), detail.getProcessorName()); - assertEquals(ProcessorExecutionDetail.ProcessorStatus.FAIL, detail.getStatus()); } public void testProcessRequestAsyncSuccess() { diff --git a/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapperTests.java b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapperTests.java index 892b385f34c68..638327efadc80 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapperTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/TrackingSearchResponseProcessorWrapperTests.java @@ -15,6 +15,8 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.io.IOException; + import org.mockito.Mockito; import static org.mockito.ArgumentMatchers.any; @@ -35,28 +37,6 @@ public void setUp() throws Exception { context = new PipelineProcessingContext(); } - public void testProcessResponse() throws Exception { - SearchRequest mockRequest = new SearchRequest(); - SearchResponse inputResponse = Mockito.mock(SearchResponse.class); - SearchResponse outputResponse = Mockito.mock(SearchResponse.class); - - when(inputResponse.getHits()).thenReturn(SearchHits.empty()); - when(outputResponse.getHits()).thenReturn(SearchHits.empty()); - when(mockProcessor.processResponse(eq(mockRequest), eq(inputResponse), eq(context))).thenReturn(outputResponse); - - SearchResponse result = wrapper.processResponse(mockRequest, inputResponse, context); - - assertEquals(outputResponse, result); - verify(mockProcessor).processResponse(mockRequest, inputResponse, context); - - assertFalse(context.getProcessorExecutionDetails().isEmpty()); - ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0); - assertEquals(wrapper.getType(), detail.getProcessorName()); - assertNotNull(detail.getInputData()); - assertNotNull(detail.getOutputData()); - assertEquals(ProcessorExecutionDetail.ProcessorStatus.SUCCESS, detail.getStatus()); - } - public void testConstructorThrowsExceptionWhenProcessorIsNull() { IllegalArgumentException exception = expectThrows( IllegalArgumentException.class, @@ -66,7 +46,7 @@ public void testConstructorThrowsExceptionWhenProcessorIsNull() { assertEquals("Wrapped processor cannot be null.", exception.getMessage()); } - public void testProcessResponseAsync() { + public void testProcessResponseAsync(){ SearchRequest mockRequest = new SearchRequest(); SearchResponse inputResponse = Mockito.mock(SearchResponse.class); SearchResponse outputResponse = Mockito.mock(SearchResponse.class);