diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 8e31aa23d88cf..796b48a021bd8 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -235,14 +235,7 @@ import org.opensearch.action.ingest.SimulatePipelineTransportAction; import org.opensearch.action.main.MainAction; import org.opensearch.action.main.TransportMainAction; -import org.opensearch.action.search.ClearScrollAction; -import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.SearchAction; -import org.opensearch.action.search.SearchScrollAction; -import org.opensearch.action.search.TransportClearScrollAction; -import org.opensearch.action.search.TransportMultiSearchAction; -import org.opensearch.action.search.TransportSearchAction; -import org.opensearch.action.search.TransportSearchScrollAction; +import org.opensearch.action.search.*; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -398,12 +391,7 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; -import org.opensearch.rest.action.search.RestClearScrollAction; -import org.opensearch.rest.action.search.RestCountAction; -import org.opensearch.rest.action.search.RestExplainAction; -import org.opensearch.rest.action.search.RestMultiSearchAction; -import org.opensearch.rest.action.search.RestSearchAction; -import org.opensearch.rest.action.search.RestSearchScrollAction; +import org.opensearch.rest.action.search.*; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; @@ -661,6 +649,9 @@ public void reg actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class); + actions.register(GetAllPITsAction.INSTANCE, TransportGetAllPITsAction.class); + return unmodifiableMap(actions.getRegistry()); } @@ -832,6 +823,11 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); + + // point in time + registerHandler.accept(new RestCreatePITAction()); + registerHandler.accept(new RestGetAllPitAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java new file mode 100644 index 0000000000000..0239129640736 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeRequest.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.lucene.index.Fields; +import org.apache.lucene.util.CharsRefBuilder; +import org.opensearch.action.support.nodes.BaseNodeRequest; +import org.opensearch.action.termvectors.TermVectorsResponse; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Iterator; + +public class GetAllPITNodeRequest extends BaseNodeRequest { + + GetAllPITNodesRequest request; + + @Inject + public GetAllPITNodeRequest(GetAllPITNodesRequest request) { + this.request = request; + } + + public GetAllPITNodeRequest(StreamInput in) throws IOException { + super(in); + request = new GetAllPITNodesRequest(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java new file mode 100644 index 0000000000000..f48879261e0e5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodeResponse.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ToXContentFragment; +import org.opensearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +public class GetAllPITNodeResponse extends BaseNodeResponse implements ToXContentFragment { + + private List pitIds; + + + @Inject + public GetAllPITNodeResponse(StreamInput in, List pitIds) throws IOException { + super(in); + this.pitIds = pitIds; + } + + public GetAllPITNodeResponse(DiscoveryNode node, List pitIds) { + super(node); + this.pitIds = pitIds; + } + public GetAllPITNodeResponse(StreamInput in) throws IOException { + super(in); + pitIds = in.readList(StreamInput::readString); + } + + public List getPitIds() { + return pitIds; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java new file mode 100644 index 0000000000000..85733fd03476c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesRequest.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class GetAllPITNodesRequest extends BaseNodesRequest { + + //DiscoveryNode[] concreteNodes; + @Inject + public GetAllPITNodesRequest(DiscoveryNode... concreteNodes) { + super(concreteNodes); + } + + public GetAllPITNodesRequest(StreamInput in) throws IOException { + super(in); + //this.concreteNodes = in.readOptionalArray(DiscoveryNode::new, DiscoveryNode[]::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + //out.writeOptionalArray(concreteNodes); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java new file mode 100644 index 0000000000000..62a2ecabc3eca --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITNodesResponse.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.admin.cluster.node.info.NodeInfo; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.StatusToXContentObject; +import org.opensearch.common.xcontent.ToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class GetAllPITNodesResponse extends BaseNodesResponse implements ToXContentObject, StatusToXContentObject { + + List pitIds = new ArrayList<>(); + + @Inject + public GetAllPITNodesResponse(StreamInput in) throws IOException { + super(in); + } + + public GetAllPITNodesResponse(ClusterName clusterName, List getAllPITNodeResponses, List failures) { + super(clusterName, getAllPITNodeResponses, failures); + for(GetAllPITNodeResponse response : getAllPITNodeResponses) { + pitIds.addAll(response.getPitIds()); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return null; + } + + @Override + public List readNodesFrom(StreamInput in) throws IOException { + return in.readList(GetAllPITNodeResponse::new); + } + + @Override + public void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public RestStatus status() { + return null; + } + + public List getPITIDs() { + return pitIds; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java b/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java new file mode 100644 index 0000000000000..64aa80b332832 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/GetAllPITsAction.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +public class GetAllPITsAction extends ActionType { + + + public static final GetAllPITsAction INSTANCE = new GetAllPITsAction(); + public static final String NAME = "indices:data/readall/pit"; + + private GetAllPITsAction() { + super(NAME, GetAllPITNodesResponse::new); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java new file mode 100644 index 0000000000000..d36e9384647bc --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPITsAction.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.*; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.SearchService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +public class TransportGetAllPITsAction + extends TransportNodesAction< + GetAllPITNodesRequest, + GetAllPITNodesResponse, + GetAllPITNodeRequest, + GetAllPITNodeResponse> { + + private final SearchService searchService; + public static final String CREATE_PIT = "delete_pit"; + @Inject + public TransportGetAllPITsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + SearchService searchService) { + // + super( + GetAllPITsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + GetAllPITNodesRequest::new, + GetAllPITNodeRequest::new, + ThreadPool.Names.SAME, + GetAllPITNodeResponse.class); + this.searchService = searchService; + } + + @Override + protected GetAllPITNodesResponse newResponse( + GetAllPITNodesRequest request, + List getAllPITNodeResponses, + List failures) { + return new GetAllPITNodesResponse(clusterService.getClusterName(), getAllPITNodeResponses, failures); + } + + @Override + protected GetAllPITNodeRequest newNodeRequest(GetAllPITNodesRequest request) { + return new GetAllPITNodeRequest(request); + } + + @Override + protected GetAllPITNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPITNodeResponse(in); + } + + @Override + protected GetAllPITNodeResponse nodeOperation(GetAllPITNodeRequest request) { + + GetAllPITNodeResponse nodeResponse = + new GetAllPITNodeResponse( + transportService.getLocalNode(), searchService.getAllPITReaderContexts()); + return nodeResponse; + } +} diff --git a/server/src/main/java/org/opensearch/client/Client.java b/server/src/main/java/org/opensearch/client/Client.java index bca68834ca3cf..1ffba9cfa29c8 100644 --- a/server/src/main/java/org/opensearch/client/Client.java +++ b/server/src/main/java/org/opensearch/client/Client.java @@ -55,17 +55,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.ClearScrollRequest; -import org.opensearch.action.search.ClearScrollRequestBuilder; -import org.opensearch.action.search.ClearScrollResponse; -import org.opensearch.action.search.MultiSearchRequest; -import org.opensearch.action.search.MultiSearchRequestBuilder; -import org.opensearch.action.search.MultiSearchResponse; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchRequestBuilder; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequest; -import org.opensearch.action.search.SearchScrollRequestBuilder; +import org.opensearch.action.search.*; import org.opensearch.action.termvectors.MultiTermVectorsRequest; import org.opensearch.action.termvectors.MultiTermVectorsRequestBuilder; import org.opensearch.action.termvectors.MultiTermVectorsResponse; @@ -439,6 +429,8 @@ public interface Client extends OpenSearchClient, Releasable { */ Settings settings(); + void getAllPits(GetAllPITNodesRequest getAllPITRequest, ActionListener listener); + /** * Returns a new lightweight Client that applies all given headers to each of the requests * issued from it. diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index a37d293ee5dd2..db4c38bc1082f 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -327,21 +327,7 @@ import org.opensearch.action.ingest.SimulatePipelineRequest; import org.opensearch.action.ingest.SimulatePipelineRequestBuilder; import org.opensearch.action.ingest.SimulatePipelineResponse; -import org.opensearch.action.search.ClearScrollAction; -import org.opensearch.action.search.ClearScrollRequest; -import org.opensearch.action.search.ClearScrollRequestBuilder; -import org.opensearch.action.search.ClearScrollResponse; -import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.MultiSearchRequest; -import org.opensearch.action.search.MultiSearchRequestBuilder; -import org.opensearch.action.search.MultiSearchResponse; -import org.opensearch.action.search.SearchAction; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchRequestBuilder; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollAction; -import org.opensearch.action.search.SearchScrollRequest; -import org.opensearch.action.search.SearchScrollRequestBuilder; +import org.opensearch.action.search.*; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.action.termvectors.MultiTermVectorsAction; @@ -368,6 +354,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.action.search.GetAllPITRequest; +import org.opensearch.rest.action.search.GetAllPITResponse; import org.opensearch.tasks.TaskId; import org.opensearch.threadpool.ThreadPool; @@ -671,6 +659,11 @@ public FieldCapabilitiesRequestBuilder prepareFieldCaps(String... indices) { return new FieldCapabilitiesRequestBuilder(this, FieldCapabilitiesAction.INSTANCE, indices); } + @Override + public void getAllPits(GetAllPITNodesRequest request, ActionListener listener) { + execute(GetAllPITsAction.INSTANCE, request, listener); + } + static class Admin implements AdminClient { private final ClusterAdmin clusterAdmin; diff --git a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java index f2e345314ee10..acf24edaa4dba 100644 --- a/server/src/main/java/org/opensearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/opensearch/rest/BaseRestHandler.java @@ -178,7 +178,7 @@ protected final String unrecognized( * the request against a channel. */ @FunctionalInterface - protected interface RestChannelConsumer extends CheckedConsumer {} + public interface RestChannelConsumer extends CheckedConsumer {} /** * Prepare the request for execution. Implementations should consume all request params before diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitAction.java new file mode 100644 index 0000000000000..9925dd39ee386 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestGetAllPitAction.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.search.GetAllPITNodesRequest; +import org.opensearch.action.search.GetAllPITNodesResponse; +import org.opensearch.action.search.GetAllPITsAction; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestActionListener; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +public class RestGetAllPitAction extends BaseRestHandler { + @Override + public String getName() { + return "get_all_pit_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { + @Override + public void processResponse(final ClusterStateResponse clusterStateResponse) throws IOException { + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPITNodesRequest getAllPITNodesRequest = new GetAllPITNodesRequest(disNodesArr); + client.execute(GetAllPITsAction.INSTANCE, getAllPITNodesRequest, new RestResponseListener(channel) { + @Override + public RestResponse buildResponse(final GetAllPITNodesResponse getAllPITNodesResponse) throws Exception { + return null; + } + }); + } + }); + } + + @Override + public List routes() { + return unmodifiableList(Collections.singletonList( + new Route(GET, "/_pit/all"))); + } +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index cc135e273e74d..4a16bd1d4f368 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -130,12 +130,7 @@ import org.opensearch.transport.TransportRequest; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; @@ -385,6 +380,17 @@ protected ReaderContext removeReaderContext(long id) { return activeReaders.remove(id); } + public List getAllPITReaderContexts() { + final List pitContexts = new ArrayList<>(); + for (ReaderContext ctx : activeReaders.values()) { + if (ctx instanceof PitReaderContext) { + final PitReaderContext pitReaderContext = (PitReaderContext) ctx; + pitContexts.add(pitReaderContext.getPitId()); + } + } + return pitContexts; + } + @Override protected void doStart() {} @@ -815,8 +821,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen shard.routingEntry(); final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); readerContext = new PitReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false, - shard.routingEntry(),nonVerboseSegments); - readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false); + shard.routingEntry(),nonVerboseSegments, null); final ReaderContext finalReaderContext = readerContext; searcherSupplier = null; // transfer ownership to reader context diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 103d5104a84c0..69ad8652463f3 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -24,19 +24,26 @@ public ShardRouting getShardRouting() { private final ShardRouting shardRouting; private final List segments; + private final String pitId; public PitReaderContext(ShardSearchContextId id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier searcherSupplier, long keepAliveInMillis, boolean singleSession, - ShardRouting shardRouting, List nonVerboseSegments) { + ShardRouting shardRouting, List nonVerboseSegments, String pitId) { super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession); this.shardRouting = shardRouting; segments = nonVerboseSegments; + this.pitId = pitId; } public List getSegments() { return segments; } + public String getPitId() { + return "pitId"; + } + + } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index 4e342875e4599..f24072220fcae 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -32,23 +32,26 @@ package org.opensearch.search; import com.carrotsearch.hppc.IntArrayList; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.action.ActionFuture; import org.opensearch.action.ActionListener; import org.opensearch.action.OriginalIndices; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.ClearScrollRequest; -import org.opensearch.action.search.SearchPhaseExecutionException; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchShardTask; -import org.opensearch.action.search.SearchType; +import org.opensearch.action.search.*; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; import org.opensearch.common.io.stream.StreamInput; @@ -78,7 +81,11 @@ import org.opensearch.indices.settings.InternalOrPrivateSettingsPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPlugin; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestResponse; import org.opensearch.rest.RestStatus; +import org.opensearch.rest.action.RestActionListener; +import org.opensearch.rest.action.RestResponseListener; import org.opensearch.script.MockScriptEngine; import org.opensearch.script.MockScriptPlugin; import org.opensearch.script.Script; @@ -89,6 +96,7 @@ import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.FetchSearchResult; import org.opensearch.search.fetch.ShardFetchRequest; @@ -104,13 +112,7 @@ import org.junit.Before; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; @@ -1422,4 +1424,76 @@ private ReaderContext createReaderContext(IndexService indexService, IndexShard false ); } + + public void testPIT() throws ExecutionException, InterruptedException { + + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + PITRequest request = new PITRequest(TimeValue.timeValueDays(1)); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + PITResponse pitResponse = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + assertHitCount(searchResponse, 1); + + final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.local(false); + //clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout())); + ActionFuture e; + clusterStateRequest.clear().nodes(true).routingTable(true).indices("*"); + ClusterStateResponse clusterStateResponse = client().admin().cluster().state(clusterStateRequest).get(); + final List nodes = new LinkedList<>(); + for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { + DiscoveryNode node = cursor.value; + nodes.add(node); + } + DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; + nodes.toArray(disNodesArr); + GetAllPITNodesRequest getAllPITNodesRequest = new GetAllPITNodesRequest(disNodesArr); + + ActionFuture execute1 = client().execute(GetAllPITsAction.INSTANCE, getAllPITNodesRequest); + GetAllPITNodesResponse response = execute1.get(); + +// BaseRestHandler.RestChannelConsumer r = channel -> client().admin().cluster().state(clusterStateRequest, new RestActionListener(channel) { +// @Override +// public void processResponse(final ClusterStateResponse clusterStateResponse) throws IOException { +// final List nodes = new LinkedList<>(); +// for (ObjectCursor cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) { +// DiscoveryNode node = cursor.value; +// nodes.add(node); +// } +// DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()]; +// nodes.toArray(disNodesArr); +// GetAllPITNodesRequest getAllPITNodesRequest = new GetAllPITNodesRequest(disNodesArr); +// client().execute(GetAllPITsAction.INSTANCE, getAllPITNodesRequest, new RestResponseListener(channel) { +// @Override +// public RestResponse buildResponse(final GetAllPITNodesResponse getAllPITNodesResponse) throws Exception { +// GetAllPITNodesResponse response = getAllPITNodesResponse; +// return null; +// } +// }); +// } +// }); + + + + //SearchService service = new SearchService(); +// ActionFuture execute1 = client().execute(GetAllPITsAction.INSTANCE, new GetAllPITNodesRequest()); +// GetAllPITNodesResponse response = execute1.get(); + SearchService service = getInstanceFromNode(SearchService.class); +// PitSegmentsRequest request1 = new PitSegmentsRequest(); +// request1.setPitIds(Arrays.asList(pitResponse.getId())); +// client().prepareIndex("index", "type", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); +// ActionFuture e = client().execute(PitSegmentsAction.INSTANCE, request1); +// IndicesSegmentResponse pitSegmentsResponse = e.get(); +// IndicesSegmentResponse indicesSegmentResponse = client().execute(IndicesSegmentsAction.INSTANCE, new IndicesSegmentsRequest("index")).get(); +// assertEquals(2, service.getActiveContexts()); +// client().execute(DeletePITAction.INSTANCE,new DeletePITRequest(pitResponse.getId())); +// +// assertEquals(0, service.getActiveContexts()); + service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + } }