Skip to content

Commit

Permalink
removed unnecessary try catch block. add more comment
Browse files Browse the repository at this point in the history
Signed-off-by: Junwei Dai <junweid@amazon.com>
  • Loading branch information
Junwei Dai committed Jan 22, 2025
1 parent 386d1f9 commit 1ccc9c6
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ void transformRequest(SearchRequest request, ActionListener<SearchRequest> reque
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
onRequestProcessorFailed(processor);
// When verbosePipeline is enabled, all processor failures are ignored to ensure the execution chain continues without
// interruption.TrackingSearchResponseProcessorWrapper will log all errors in detail for debugging purposes
if (processor.isIgnoreFailure() || r.source().verbosePipeline()) {
logger.warn(
"The exception from request processor ["
Expand Down Expand Up @@ -239,7 +241,8 @@ ActionListener<SearchResponse> transformResponseListener(
onResponseProcessorFailed(processor);
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
// If an error occurs and the pipeline is in verbose mode, the processor will not terminate the execution chain.
// When verbosePipeline is enabled, all processor failures are ignored to ensure the execution chain continues without
// interruption.TrackingSearchResponseProcessorWrapper will log all errors in detail for debugging purposes
if (processor.isIgnoreFailure() || request.source().verbosePipeline()) {
logger.warn(
"The exception from response processor ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,9 @@ public SearchRequest processRequest(SearchRequest request) throws Exception {

@Override
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) throws Exception {
ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag());

long start = System.nanoTime();
try {
detail.addInput(request.source().toString());
SearchRequest result = wrappedProcessor.processRequest(request, requestContext);
detail.addOutput(result.source().toString());
long took = System.nanoTime() - start;
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
return result;
} catch (Exception e) {
detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage());
requestContext.addProcessorExecutionDetail(detail);
return wrappedProcessor.processRequest(request, requestContext);
} catch (UnsupportedOperationException e) {
throw e;
}
}
Expand All @@ -81,26 +70,18 @@ public void processRequestAsync(
ActionListener<SearchRequest> requestListener
) {
ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag());

long start = System.nanoTime();
try {
detail.addInput(request.source().toString());
wrappedProcessor.processRequestAsync(request, requestContext, ActionListener.wrap(result -> {
detail.addOutput(result.source().toString());
long took = System.nanoTime() - start;
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
requestListener.onResponse(result);
}, e -> {
detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage());
requestContext.addProcessorExecutionDetail(detail);
requestListener.onFailure(e);
}));
} catch (Exception e) {
detail.addInput(request.source().toString());
wrappedProcessor.processRequestAsync(request, requestContext, ActionListener.wrap(result -> {
detail.addOutput(result.source().toString());
long took = System.nanoTime() - start;
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
requestListener.onResponse(result);
}, e -> {
detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage());
requestContext.addProcessorExecutionDetail(detail);
requestListener.onFailure(e);
}
}));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.util.Arrays;

/**
Expand Down Expand Up @@ -63,20 +64,9 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelineProcessingContext requestContext)
throws Exception {
ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag());

long start = System.nanoTime();
try {
detail.addInput(Arrays.asList(response.getHits().deepCopy().getHits()));
SearchResponse result = wrappedProcessor.processResponse(request, response, requestContext);
detail.addOutput(Arrays.asList(result.getHits().deepCopy().getHits()));
long took = System.nanoTime() - start;
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
return result;
} catch (Exception e) {
detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage());
requestContext.addProcessorExecutionDetail(detail);
return wrappedProcessor.processResponse(request, response, requestContext);
} catch (UnsupportedOperationException e) {
throw e;
}
}
Expand All @@ -89,25 +79,23 @@ public void processResponseAsync(
ActionListener<SearchResponse> responseListener
) {
ProcessorExecutionDetail detail = new ProcessorExecutionDetail(getType(), getTag());

long start = System.nanoTime();
try {
detail.addInput(Arrays.asList(response.getHits().deepCopy().getHits()));
wrappedProcessor.processResponseAsync(request, response, requestContext, ActionListener.wrap(result -> {
detail.addOutput(Arrays.asList(result.getHits().deepCopy().getHits()));
long took = System.nanoTime() - start;
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
responseListener.onResponse(result);
}, e -> {
detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage());
requestContext.addProcessorExecutionDetail(detail);
responseListener.onFailure(e);
}));
} catch (Exception e) {
} catch (IOException e) {
throw new RuntimeException(e);
}
wrappedProcessor.processResponseAsync(request, response, requestContext, ActionListener.wrap(result -> {
detail.addOutput(Arrays.asList(result.getHits().deepCopy().getHits()));
long took = System.nanoTime() - start;
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
responseListener.onResponse(result);
}, e -> {
detail.markProcessorAsFailed(ProcessorExecutionDetail.ProcessorStatus.FAIL, e.getMessage());
requestContext.addProcessorExecutionDetail(detail);
responseListener.onFailure(e);
}
}));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,26 @@ public void setUp() throws Exception {
context = new PipelineProcessingContext();
}

public void testProcessRequest() throws Exception {
public void testProcessRequestSuccess() throws Exception {
SearchRequest inputRequest = new SearchRequest();
inputRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));

SearchRequest outputRequest = new SearchRequest();
outputRequest.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("field", "value")));

when(mockProcessor.processRequest(any(SearchRequest.class), eq(context))).thenReturn(outputRequest);

SearchRequest result = wrapper.processRequest(inputRequest, context);
assertEquals(outputRequest, result);
verify(mockProcessor).processRequest(inputRequest, context);

assertFalse(context.getProcessorExecutionDetails().isEmpty());
ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0);
assertEquals(wrapper.getType(), detail.getProcessorName());
assertNotNull(detail.getInputData());
assertNotNull(detail.getOutputData());
assertEquals(ProcessorExecutionDetail.ProcessorStatus.SUCCESS, detail.getStatus());
verify(mockProcessor).processRequest(inputRequest, context);
}

public void testProcessRequestException() throws Exception {
SearchRequest inputRequest = new SearchRequest();
inputRequest.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
Exception processorException = new RuntimeException("Processor failed");

RuntimeException processorException = new UnsupportedOperationException("Processor failed");

when(mockProcessor.processRequest(any(SearchRequest.class), eq(context))).thenThrow(processorException);

Expand All @@ -69,10 +65,6 @@ public void testProcessRequestException() throws Exception {
} catch (Exception e) {
assertEquals("Processor failed", e.getMessage());
}

ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0);
assertEquals(wrapper.getType(), detail.getProcessorName());
assertEquals(ProcessorExecutionDetail.ProcessorStatus.FAIL, detail.getStatus());
}

public void testProcessRequestAsyncSuccess() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;

import org.mockito.Mockito;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -35,28 +37,6 @@ public void setUp() throws Exception {
context = new PipelineProcessingContext();
}

public void testProcessResponse() throws Exception {
SearchRequest mockRequest = new SearchRequest();
SearchResponse inputResponse = Mockito.mock(SearchResponse.class);
SearchResponse outputResponse = Mockito.mock(SearchResponse.class);

when(inputResponse.getHits()).thenReturn(SearchHits.empty());
when(outputResponse.getHits()).thenReturn(SearchHits.empty());
when(mockProcessor.processResponse(eq(mockRequest), eq(inputResponse), eq(context))).thenReturn(outputResponse);

SearchResponse result = wrapper.processResponse(mockRequest, inputResponse, context);

assertEquals(outputResponse, result);
verify(mockProcessor).processResponse(mockRequest, inputResponse, context);

assertFalse(context.getProcessorExecutionDetails().isEmpty());
ProcessorExecutionDetail detail = context.getProcessorExecutionDetails().get(0);
assertEquals(wrapper.getType(), detail.getProcessorName());
assertNotNull(detail.getInputData());
assertNotNull(detail.getOutputData());
assertEquals(ProcessorExecutionDetail.ProcessorStatus.SUCCESS, detail.getStatus());
}

public void testConstructorThrowsExceptionWhenProcessorIsNull() {
IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
Expand All @@ -66,7 +46,7 @@ public void testConstructorThrowsExceptionWhenProcessorIsNull() {
assertEquals("Wrapped processor cannot be null.", exception.getMessage());
}

public void testProcessResponseAsync() {
public void testProcessResponseAsync(){
SearchRequest mockRequest = new SearchRequest();
SearchResponse inputResponse = Mockito.mock(SearchResponse.class);
SearchResponse outputResponse = Mockito.mock(SearchResponse.class);
Expand Down

0 comments on commit 1ccc9c6

Please sign in to comment.