diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index efcc13921c398..8b3ef713964f7 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception { "nodes.hot_threads", "nodes.usage", "nodes.reload_secure_settings", - "search_shards", }; + "search_shards", + "get_all_pits" }; List booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password"); Set deprecatedMethods = new HashSet<>(); deprecatedMethods.add("indices.force_merge"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json index 1e49f99ab20c5..9ab9b8d08d1b5 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json @@ -2,7 +2,7 @@ "create_pit":{ "documentation":{ "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", - "description":"Creates point in time context." + "description":"Creates point in time search context." }, "stability":"experimental", "url":{ diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json new file mode 100644 index 0000000000000..067c9394fc87e --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/get_all_pits.json @@ -0,0 +1,19 @@ +{ + "get_all_pits":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Get all active point in time searches." + }, + "stability":"experimental", + "url":{ + "paths":[ + { + "path":"/_search/point_in_time/all", + "methods":[ + "GET" + ] + } + ] + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml index 58f019e788968..75ffd90e2b778 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml @@ -27,9 +27,18 @@ keep_alive: 23h - set: {id: pit_id} + - set: {creation_time: create_time} - match: { _shards.total: 1} - match: { _shards.successful: 1} - match: { _shards.failed: 0} + + - do: + get_all_pits: {} + + - match: {pits.0.pitId: $pit_id} + - match: {pits.0.creationTime: $create_time} + - match: {pits.0.keepAlive: 82800000} + - do: search: rest_total_hits_as_int: true @@ -87,7 +96,7 @@ "pit_id": [$pit_id] - match: {pits.0.pitId: $pit_id} - - match: {pits.0.succeeded: true } + - match: {pits.0.successful: true } --- "Delete all": @@ -127,7 +136,7 @@ delete_all_pits: {} - match: {pits.0.pitId: $pit_id} - - match: {pits.0.succeeded: true } + - match: {pits.0.successful: true } - do: catch: missing diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index b453320ba55be..0de277d89ef1b 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -231,7 +231,20 @@ import org.opensearch.action.ingest.SimulatePipelineTransportAction; import org.opensearch.action.main.MainAction; import org.opensearch.action.main.TransportMainAction; -import org.opensearch.action.search.*; +import org.opensearch.action.search.ClearScrollAction; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.action.search.MultiSearchAction; +import org.opensearch.action.search.SearchAction; +import org.opensearch.action.search.SearchScrollAction; +import org.opensearch.action.search.TransportClearScrollAction; +import org.opensearch.action.search.TransportCreatePitAction; +import org.opensearch.action.search.TransportDeletePitAction; +import org.opensearch.action.search.TransportGetAllPitsAction; +import org.opensearch.action.search.TransportMultiSearchAction; +import org.opensearch.action.search.TransportSearchAction; +import org.opensearch.action.search.TransportSearchScrollAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -387,7 +400,15 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; -import org.opensearch.rest.action.search.*; +import org.opensearch.rest.action.search.RestClearScrollAction; +import org.opensearch.rest.action.search.RestCountAction; +import org.opensearch.rest.action.search.RestCreatePitAction; +import org.opensearch.rest.action.search.RestDeletePitAction; +import org.opensearch.rest.action.search.RestExplainAction; +import org.opensearch.rest.action.search.RestGetAllPitsAction; +import org.opensearch.rest.action.search.RestMultiSearchAction; +import org.opensearch.rest.action.search.RestSearchAction; +import org.opensearch.rest.action.search.RestSearchScrollAction; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java index 1af56a044205b..7ffa30a182458 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java @@ -15,7 +15,7 @@ */ public class CreatePitAction extends ActionType { public static final CreatePitAction INSTANCE = new CreatePitAction(); - public static final String NAME = "indices:data/read/point_in_time"; + public static final String NAME = "indices:data/read/point_in_time/create"; private CreatePitAction() { super(NAME, CreatePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java index f807a82301aba..717017de4a62c 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitAction.java @@ -17,7 +17,7 @@ public class DeletePitAction extends ActionType { public static final DeletePitAction INSTANCE = new DeletePitAction(); - public static final String NAME = "indices:admin/read/pit/delete"; + public static final String NAME = "indices:data/read/point_in_time/delete"; private DeletePitAction() { super(NAME, DeletePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java index 4409ff80ae74e..e576b98890508 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeRequest.java @@ -15,6 +15,9 @@ import java.io.IOException; +/** + * Inner node get all pits request + */ public class GetAllPitNodeRequest extends BaseNodeRequest { GetAllPitNodesRequest request; diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java index 4ae1a3ee43018..035a652f2f172 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodeResponse.java @@ -21,7 +21,7 @@ import java.util.List; /** - * Response which holds information about all PIT contexts in a node + * Inner node get all pits response */ public class GetAllPitNodeResponse extends BaseNodeResponse implements ToXContentFragment { diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java index 0073d4308c174..d33f95cb9a340 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -17,7 +17,7 @@ import java.io.IOException; /** - * Request to get all active PIT IDs in set of nodes + * Request to get all active PIT IDs from all nodes of cluster */ public class GetAllPitNodesRequest extends BaseNodesRequest { @Inject diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java index c80de6103d0d8..62dd0a34f709a 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java @@ -24,6 +24,9 @@ import java.util.Set; import java.util.stream.Collectors; +/** + * This class transforms active PIT objects from all nodes to unique PIT objects + */ public class GetAllPitNodesResponse extends BaseNodesResponse implements ToXContentObject { List pitsInfo = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java index 4589172ef492d..16e65cb785a7d 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitsAction.java @@ -15,7 +15,7 @@ */ public class GetAllPitsAction extends ActionType { public static final GetAllPitsAction INSTANCE = new GetAllPitsAction(); - public static final String NAME = "indices:data/readall/pit"; + public static final String NAME = "indices:data/read/point_in_time/readall"; private GetAllPitsAction() { super(NAME, GetAllPitNodesResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java index 59e87cc4fe9da..4499e7d6e8ef5 100644 --- a/server/src/main/java/org/opensearch/action/search/ListPitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/ListPitInfo.java @@ -22,15 +22,18 @@ public class ListPitInfo implements ToXContentFragment, Writeable { private final String pitId; private final long creationTime; + private final long keepAlive; - public ListPitInfo(String pitId, long creationTime) { + public ListPitInfo(String pitId, long creationTime, long keepAlive) { this.pitId = pitId; this.creationTime = creationTime; + this.keepAlive = keepAlive; } public ListPitInfo(StreamInput in) throws IOException { this.pitId = in.readString(); this.creationTime = in.readLong(); + this.keepAlive = in.readLong(); } @Override @@ -38,6 +41,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(); builder.field("pitId", pitId); builder.field("creationTime", creationTime); + builder.field("keepAlive", keepAlive); builder.endObject(); return builder; } @@ -54,5 +58,6 @@ public long getCreationTime() { public void writeTo(StreamOutput out) throws IOException { out.writeString(pitId); out.writeLong(creationTime); + out.writeLong(keepAlive); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java index c272e66aed4bd..b5593e772f428 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java @@ -22,7 +22,7 @@ import java.util.List; /** - * Transport action to get all PIT contexts + * Transport action to get all active PIT contexts across all nodes */ public class TransportGetAllPitsAction extends TransportNodesAction< GetAllPitNodesRequest, @@ -73,7 +73,7 @@ protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOExcepti } /** - * This node specific operation retrieves all node specific information + * This node specific operation retrieves all active PIT IDs in a node */ @Override protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java index a0b13c4efee61..44f61b55a9199 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitsAction.java @@ -46,11 +46,13 @@ public String getName() { protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); clusterStateRequest.local(false); - clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); + clusterStateRequest.clusterManagerNodeTimeout( + request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout()) + ); clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { @Override - public void processResponse(final ClusterStateResponse clusterStateResponse) throws IOException { + public void processResponse(final ClusterStateResponse clusterStateResponse) { final List nodes = new LinkedList<>(); for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { DiscoveryNode node = cursor.value; @@ -64,7 +66,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) thr public RestResponse buildResponse(final GetAllPitNodesResponse getAllPITNodesResponse) throws Exception { try (XContentBuilder builder = channel.newBuilder()) { builder.startObject(); - builder.field("pitIds", getAllPITNodesResponse.getPITIDs()); + builder.field("pits", getAllPITNodesResponse.getPITIDs()); builder.endObject(); return new BytesRestResponse(RestStatus.OK, builder); } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index fb505574bd4bc..c220018421b07 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -41,7 +41,15 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; -import org.opensearch.action.search.*; +import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.ListPitInfo; +import org.opensearch.action.search.PitSearchContextIdForNode; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.action.search.SearchType; +import org.opensearch.action.search.UpdatePitContextRequest; +import org.opensearch.action.search.UpdatePitContextResponse; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; @@ -1454,7 +1462,7 @@ public List getAllPITReaderContexts() { for (ReaderContext ctx : activeReaders.values()) { if (ctx instanceof PitReaderContext) { final PitReaderContext context = (PitReaderContext) ctx; - ListPitInfo pitInfo = new ListPitInfo(context.getPitId(), context.getCreationTime()); + ListPitInfo pitInfo = new ListPitInfo(context.getPitId(), context.getCreationTime(), context.getKeepAlive()); pitContextsInfo.add(pitInfo); } } diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 43ca7e0ebd823..135d8b1d173b0 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -61,7 +61,7 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId, } public long getCreationTime() { - return this.creationTime.get(); + return this.creationTime.get() == null ? 0 : this.creationTime.get(); } public void setCreationTime(final long creationTime) { diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 04791e05f603c..7db9032e8ff9c 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -117,6 +117,10 @@ protected AtomicLong getLastAccessTime() { return lastAccessTime; } + public long getKeepAlive() { + return keepAlive.get(); + } + @Override public final void close() { if (closed.compareAndSet(false, true)) { diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java index ec83cb45697d9..2571131915300 100644 --- a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -8,7 +8,14 @@ package org.opensearch.action.search; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.junit.Assert; import org.opensearch.Version; +import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.index.query.IdsQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; @@ -21,8 +28,12 @@ import org.opensearch.search.internal.ShardSearchContextId; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import static org.opensearch.test.OpenSearchIntegTestCase.client; import static org.opensearch.test.OpenSearchTestCase.between; import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; import static org.opensearch.test.OpenSearchTestCase.randomBoolean; @@ -81,4 +92,41 @@ public static String getPitId() { } return SearchContextId.encode(array.asList(), aliasFilters, version); } + + public static void assertUsingGetAllPits(Client client, String id, long creationTime) throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + Assert.assertTrue(getPitResponse.getPITIDs().get(0).getPitId().contains(id)); + Assert.assertEquals(getPitResponse.getPITIDs().get(0).getCreationTime(), creationTime); + } + + public static void assertGetAllPitsEmpty(Client client) throws ExecutionException, InterruptedException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr); + ActionFuture execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest); + GetAllPitNodesResponse getPitResponse = execute1.get(); + Assert.assertEquals(0, getPitResponse.getPITIDs().size()); + } } diff --git a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java index eff051bb5b502..fbeb26dcb9383 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java @@ -15,7 +15,14 @@ import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; -import org.opensearch.action.search.*; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.GetAllPitNodesRequest; +import org.opensearch.action.search.GetAllPitNodesResponse; +import org.opensearch.action.search.GetAllPitsAction; +import org.opensearch.action.search.PitTestsUtil; +import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -55,6 +62,7 @@ public void testPit() throws Exception { request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) @@ -86,6 +94,7 @@ public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exce public Settings onNodeStopped(String nodeName) throws Exception { ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); assertEquals(1, pitResponse.getSuccessfulShards()); assertEquals(2, pitResponse.getTotalShards()); SearchResponse searchResponse = client().prepareSearch("index") @@ -115,6 +124,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(1, searchResponse.getFailedShards()); assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); return super.onNodeStopped(nodeName); } }); diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index 5c3c43af9cb66..4ec6f468174c8 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -14,6 +14,7 @@ import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.PitTestsUtil; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.Priority; @@ -63,6 +64,9 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") .setSize(2) @@ -86,6 +90,7 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse response = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime()); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); service.doClose(); @@ -99,6 +104,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); SearchResponse searchResponse = client().prepareSearch("index") @@ -128,7 +134,7 @@ public void testCreatePITWithNonExistentIndex() { service.doClose(); } - public void testCreatePITOnCloseIndex() { + public void testCreatePITOnCloseIndex() throws ExecutionException, InterruptedException { createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -144,6 +150,7 @@ public void testCreatePITOnCloseIndex() { SearchService service = getInstanceFromNode(SearchService.class); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); service.doClose(); } @@ -165,6 +172,7 @@ public void testPitSearchOnDeletedIndex() throws ExecutionException, Interrupted }); assertTrue(ex.getMessage().contains("no such index [index]")); SearchService service = getInstanceFromNode(SearchService.class); + PitTestsUtil.assertGetAllPitsEmpty(client()); assertEquals(0, service.getActiveContexts()); service.doClose(); } @@ -190,6 +198,8 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); client().admin().indices().prepareClose("index").get(); @@ -201,6 +211,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx }); assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException")); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); // PIT reader contexts are lost after close, verifying it with open index api client().admin().indices().prepareOpen("index").get(); @@ -314,6 +325,8 @@ public void testPitAfterUpdateIndex() throws Exception { request.setIndices(new String[] { "test" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + SearchService service = getInstanceFromNode(SearchService.class); assertThat( @@ -456,6 +469,7 @@ public void testPitAfterUpdateIndex() throws Exception { } finally { service.doClose(); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); } } @@ -467,6 +481,8 @@ public void testConcurrentSearches() throws Exception { request.setIndices(new String[] { "index" }); ActionFuture execute = client().execute(CreatePitAction.INSTANCE, request); CreatePitResponse pitResponse = execute.get(); + PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime()); + Thread[] threads = new Thread[5]; CountDownLatch latch = new CountDownLatch(threads.length); @@ -497,5 +513,6 @@ public void testConcurrentSearches() throws Exception { assertEquals(2, service.getActiveContexts()); service.doClose(); assertEquals(0, service.getActiveContexts()); + PitTestsUtil.assertGetAllPitsEmpty(client()); } } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index cbfcb43c22a21..7596edaac2c70 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1414,6 +1414,7 @@ public void testOpenReaderContext() { searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); future.actionGet(); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1431,6 +1432,7 @@ public void testDeletePitReaderContext() { contextIds.add(pitSearchContextIdForNode); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); DeletePitResponse deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // assert true for reader context not found @@ -1449,8 +1451,10 @@ public void testDeleteAllPitReaderContexts() { searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); future.actionGet(); assertThat(searchService.getActiveContexts(), equalTo(2)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(2)); searchService.freeAllPitContexts(); assertThat(searchService.getActiveContexts(), equalTo(0)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } public void testPitContextMaxKeepAlive() { @@ -1473,6 +1477,7 @@ public void testPitContextMaxKeepAlive() { ex.getMessage() ); assertThat(searchService.getActiveContexts(), equalTo(0)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } public void testUpdatePitId() { @@ -1495,6 +1500,7 @@ public void testUpdatePitId() { assertTrue(updateResponse.getKeepAlive() == updateRequest.getKeepAlive()); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1527,6 +1533,7 @@ public void testUpdatePitIdMaxKeepAlive() { ex.getMessage() ); assertThat(searchService.getActiveContexts(), equalTo(1)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(1)); assertTrue(searchService.freeReaderContext(future.actionGet())); } @@ -1547,6 +1554,7 @@ public void testUpdatePitIdWithInvalidReaderId() { assertEquals("No search context found for id [" + id.getId() + "]", ex.getMessage()); assertThat(searchService.getActiveContexts(), equalTo(0)); + assertThat(searchService.getAllPITReaderContexts().size(), equalTo(0)); } private ReaderContext createReaderContext(IndexService indexService, IndexShard indexShard) {