From 5faf0cdd90852b36f8978b2e20a53c4f50caa94a Mon Sep 17 00:00:00 2001 From: Simon Cooper Date: Wed, 16 Oct 2024 13:30:18 +0100 Subject: [PATCH] Remove the min_compatible_shard_node option and associated classes (#114713) Any similar functionality in the future should use capabilities instead --- .../SearchWithMinCompatibleSearchNodeIT.java | 144 ----- .../resources/rest-api-spec/api/search.json | 4 - .../elasticsearch/ElasticsearchException.java | 6 - .../org/elasticsearch/TransportVersions.java | 1 + .../search/AbstractSearchAsyncAction.java | 32 +- .../search/CanMatchPreFilterSearchPhase.java | 32 +- .../action/search/SearchRequest.java | 41 +- .../search/VersionMismatchException.java | 27 - .../rest/action/search/RestSearchAction.java | 11 +- .../ExceptionSerializationTests.java | 3 +- .../SearchQueryThenFetchAsyncActionTests.java | 495 ------------------ .../action/search/SearchRequestTests.java | 28 - .../eql/plugin/TransportEqlSearchAction.java | 22 +- .../fleet/rest/RestFleetSearchAction.java | 12 +- .../xpack/ql/plugin/TransportActionUtils.java | 81 --- .../xpack/sql/execution/search/Querier.java | 4 +- .../sql/plugin/TransportSqlQueryAction.java | 23 +- 17 files changed, 29 insertions(+), 937 deletions(-) delete mode 100644 qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java delete mode 100644 server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java delete mode 100644 x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plugin/TransportActionUtils.java diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java deleted file mode 100644 index a391ee5a3bd7b..0000000000000 --- a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/SearchWithMinCompatibleSearchNodeIT.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ -package org.elasticsearch.backwards; - -import org.apache.http.HttpHost; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseException; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.core.Strings; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.rest.ESRestTestCase; -import org.elasticsearch.test.rest.ObjectPath; -import org.junit.Before; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; - -public class SearchWithMinCompatibleSearchNodeIT extends ESRestTestCase { - - private static final String BWC_NODES_VERSION = System.getProperty("tests.bwc_nodes_version"); - private static final String NEW_NODES_VERSION = System.getProperty("tests.new_nodes_version"); - - private static String index = "test_min_version"; - private static int numShards; - private static int numReplicas = 1; - private static int numDocs; - private static MixedClusterTestNodes nodes; - private static List allNodes; - - @Before - public void prepareTestData() throws IOException { - nodes = MixedClusterTestNodes.buildNodes(client(), BWC_NODES_VERSION); - numShards = nodes.size(); - numDocs = randomIntBetween(numShards, 16); - allNodes = new ArrayList<>(); - allNodes.addAll(nodes.getBWCNodes()); - allNodes.addAll(nodes.getNewNodes()); - - if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) { - createIndex(index, indexSettings(numShards, numReplicas).build()); - for (int i = 0; i < numDocs; i++) { - Request request = new Request("PUT", index + "/_doc/" + i); - request.setJsonEntity("{\"test\": \"test_" + randomAlphaOfLength(2) + "\"}"); - assertOK(client().performRequest(request)); - } - ensureGreen(index); - } - } - - public void testMinVersionAsNewVersion() throws Exception { - try ( - RestClient client = buildClient( - restClientSettings(), - allNodes.stream().map(MixedClusterTestNode::publishAddress).toArray(HttpHost[]::new) - ) - ) { - Request newVersionRequest = new Request( - "POST", - index + "/_search?min_compatible_shard_node=" + NEW_NODES_VERSION + "&ccs_minimize_roundtrips=false" - ); - assertBusy(() -> { - ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(newVersionRequest)); - assertThat( - responseException.getResponse().getStatusLine().getStatusCode(), - equalTo(RestStatus.INTERNAL_SERVER_ERROR.getStatus()) - ); - assertThat(responseException.getMessage(), containsString(""" - {"error":{"root_cause":[],"type":"search_phase_execution_exception\"""")); - assertThat(responseException.getMessage(), containsString(Strings.format(""" - caused_by":{"type":"version_mismatch_exception",\ - "reason":"One of the shards is incompatible with the required minimum version [%s]\"""", NEW_NODES_VERSION))); - }); - } - } - - public void testMinVersionAsOldVersion() throws Exception { - try ( - RestClient client = buildClient( - restClientSettings(), - allNodes.stream().map(MixedClusterTestNode::publishAddress).toArray(HttpHost[]::new) - ) - ) { - Request oldVersionRequest = new Request( - "POST", - index + "/_search?min_compatible_shard_node=" + BWC_NODES_VERSION + "&ccs_minimize_roundtrips=false" - ); - oldVersionRequest.setJsonEntity(""" - {"query":{"match_all":{}},"_source":false}"""); - assertBusy(() -> { - Response response = client.performRequest(oldVersionRequest); - ObjectPath responseObject = ObjectPath.createFromResponse(response); - Map shardsResult = responseObject.evaluate("_shards"); - assertThat(shardsResult.get("total"), equalTo(numShards)); - assertThat(shardsResult.get("successful"), equalTo(numShards)); - assertThat(shardsResult.get("failed"), equalTo(0)); - Map hitsResult = responseObject.evaluate("hits.total"); - assertThat(hitsResult.get("value"), equalTo(numDocs)); - assertThat(hitsResult.get("relation"), equalTo("eq")); - }); - } - } - - public void testCcsMinimizeRoundtripsIsFalse() throws Exception { - try ( - RestClient client = buildClient( - restClientSettings(), - allNodes.stream().map(MixedClusterTestNode::publishAddress).toArray(HttpHost[]::new) - ) - ) { - String version = randomBoolean() ? NEW_NODES_VERSION : BWC_NODES_VERSION; - - Request request = new Request( - "POST", - index + "/_search?min_compatible_shard_node=" + version + "&ccs_minimize_roundtrips=true" - ); - assertBusy(() -> { - ResponseException responseException = expectThrows(ResponseException.class, () -> client.performRequest(request)); - assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(RestStatus.BAD_REQUEST.getStatus())); - assertThat(responseException.getMessage(), containsString(""" - {"error":{"root_cause":[{"type":"action_request_validation_exception"\ - """)); - assertThat( - responseException.getMessage(), - containsString( - "\"reason\":\"Validation Failed: 1: " - + "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version;\"" - ) - ); - }); - } - } -} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json index b5dc4d62a2f0f..25b4efd9c4c37 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/search.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/search.json @@ -237,10 +237,6 @@ "description":"Indicates whether hits.total should be rendered as an integer or an object in the rest search response", "default":false }, - "min_compatible_shard_node":{ - "type":"string", - "description":"The minimum compatible version that all shards involved in search should have for this request to be successful" - }, "include_named_queries_score":{ "type": "boolean", "description":"Indicates whether hit.matched_queries should be rendered as a map that includes the name of the matched query associated with its score (true) or as an array containing the name of the matched queries (false)", diff --git a/server/src/main/java/org/elasticsearch/ElasticsearchException.java b/server/src/main/java/org/elasticsearch/ElasticsearchException.java index 5d04e31069b1c..4119e12d45f6c 100644 --- a/server/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/server/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -1819,12 +1819,6 @@ private enum ElasticsearchExceptionHandle { 160, TransportVersions.V_7_10_0 ), - VERSION_MISMATCH_EXCEPTION( - org.elasticsearch.action.search.VersionMismatchException.class, - org.elasticsearch.action.search.VersionMismatchException::new, - 161, - TransportVersions.V_7_12_0 - ), AUTHENTICATION_PROCESSING_ERROR( org.elasticsearch.ElasticsearchAuthenticationProcessingError.class, org.elasticsearch.ElasticsearchAuthenticationProcessingError::new, diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index f89c5a65693f2..d1d423dcc5405 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -246,6 +246,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_PER_AGGREGATE_FILTER = def(8_770_00_0); public static final TransportVersion ML_INFERENCE_ATTACH_TO_EXISTSING_DEPLOYMENT = def(8_771_00_0); public static final TransportVersion CONVERT_FAILURE_STORE_OPTIONS_TO_SELECTOR_OPTIONS_INTERNALLY = def(8_772_00_0); + public static final TransportVersion REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(8_773_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index caa7453185575..0c585c705dcd0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -15,7 +15,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; @@ -234,15 +233,6 @@ public final void run() { } if (shardsIts.size() > 0) { doCheckNoMissingShards(getName(), request, shardsIts); - Version version = request.minCompatibleShardNode(); - if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) { - if (checkMinimumVersion(shardsIts) == false) { - throw new VersionMismatchException( - "One of the shards is incompatible with the required minimum version [{}]", - request.minCompatibleShardNode() - ); - } - } for (int i = 0; i < shardsIts.size(); i++) { final SearchShardIterator shardRoutings = shardsIts.get(i); assert shardRoutings.skip() == false; @@ -260,21 +250,6 @@ void skipShard(SearchShardIterator iterator) { successfulShardExecution(iterator); } - private boolean checkMinimumVersion(GroupShardsIterator shardsIts) { - for (SearchShardIterator it : shardsIts) { - if (it.getTargetNodeIds().isEmpty() == false) { - boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> { - Transport.Connection conn = getConnection(it.getClusterAlias(), nodeId); - return conn == null || conn.getNode().getVersion().onOrAfter(request.minCompatibleShardNode()); - }); - if (isCompatible == false) { - return false; - } - } - } - return true; - } - private static boolean assertExecuteOnStartThread() { // Ensure that the current code has the following stacktrace: // AbstractSearchAsyncAction#start -> AbstractSearchAsyncAction#executePhase -> AbstractSearchAsyncAction#performPhaseOnShard @@ -761,12 +736,7 @@ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() @Override public final Transport.Connection getConnection(String clusterAlias, String nodeId) { - Transport.Connection conn = nodeIdToConnection.apply(clusterAlias, nodeId); - Version minVersion = request.minCompatibleShardNode(); - if (minVersion != null && conn != null && conn.getNode().getVersion().before(minVersion)) { - throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion); - } - return conn; + return nodeIdToConnection.apply(clusterAlias, nodeId); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 8ce2cc7b6b19e..8dcfbf5f070a1 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.util.Maps; @@ -133,15 +132,6 @@ private static boolean assertSearchCoordinationThread() { public void run() { assert assertSearchCoordinationThread(); checkNoMissingShards(); - Version version = request.minCompatibleShardNode(); - if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) { - if (checkMinimumVersion(shardsIts) == false) { - throw new VersionMismatchException( - "One of the shards is incompatible with the required minimum version [{}]", - request.minCompatibleShardNode() - ); - } - } runCoordinatorRewritePhase(); } @@ -378,21 +368,6 @@ public CanMatchNodeRequest.Shard buildShardLevelRequest(SearchShardIterator shar ); } - private boolean checkMinimumVersion(GroupShardsIterator shardsIts) { - for (SearchShardIterator it : shardsIts) { - if (it.getTargetNodeIds().isEmpty() == false) { - boolean isCompatible = it.getTargetNodeIds().stream().anyMatch(nodeId -> { - Transport.Connection conn = getConnection(new SendingTarget(it.getClusterAlias(), nodeId)); - return conn == null || conn.getNode().getVersion().onOrAfter(request.minCompatibleShardNode()); - }); - if (isCompatible == false) { - return false; - } - } - } - return true; - } - @Override public void start() { if (getNumShards() == 0) { @@ -421,12 +396,7 @@ public void onPhaseFailure(String msg, Exception cause) { } public Transport.Connection getConnection(SendingTarget sendingTarget) { - Transport.Connection conn = nodeIdToConnection.apply(sendingTarget.clusterAlias, sendingTarget.nodeId); - Version minVersion = request.minCompatibleShardNode(); - if (minVersion != null && conn != null && conn.getNode().getVersion().before(minVersion)) { - throw new VersionMismatchException("One of the shards is incompatible with the required minimum version [{}]", minVersion); - } - return conn; + return nodeIdToConnection.apply(sendingTarget.clusterAlias, sendingTarget.nodeId); } private int getNumShards() { diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 9961c3770fa86..5aec2bcd04b26 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -92,9 +92,6 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private boolean ccsMinimizeRoundtrips; - @Nullable - private final Version minCompatibleShardNode; - public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandOpenAndForbidClosedIgnoreThrottled(); private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; @@ -112,15 +109,10 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private boolean forceSyntheticSource = false; public SearchRequest() { - this((Version) null); - } - - public SearchRequest(Version minCompatibleShardNode) { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; this.finalReduce = true; - this.minCompatibleShardNode = minCompatibleShardNode; - this.ccsMinimizeRoundtrips = minCompatibleShardNode == null; + this.ccsMinimizeRoundtrips = true; } /** @@ -219,7 +211,6 @@ private SearchRequest( this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; this.finalReduce = finalReduce; - this.minCompatibleShardNode = searchRequest.minCompatibleShardNode; this.waitForCheckpoints = searchRequest.waitForCheckpoints; this.waitForCheckpointsTimeout = searchRequest.waitForCheckpointsTimeout; this.forceSyntheticSource = searchRequest.forceSyntheticSource; @@ -263,10 +254,8 @@ public SearchRequest(StreamInput in) throws IOException { finalReduce = true; } ccsMinimizeRoundtrips = in.readBoolean(); - if (in.readBoolean()) { - minCompatibleShardNode = Version.readVersion(in); - } else { - minCompatibleShardNode = null; + if (in.getTransportVersion().before(TransportVersions.REMOVE_MIN_COMPATIBLE_SHARD_NODE) && in.readBoolean()) { + Version.readVersion(in); // and drop on the floor } waitForCheckpoints = in.readMap(StreamInput::readLongArray); waitForCheckpointsTimeout = in.readTimeValue(); @@ -302,9 +291,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(finalReduce); } out.writeBoolean(ccsMinimizeRoundtrips); - out.writeBoolean(minCompatibleShardNode != null); - if (minCompatibleShardNode != null) { - Version.writeVersion(minCompatibleShardNode, out); + if (out.getTransportVersion().before(TransportVersions.REMOVE_MIN_COMPATIBLE_SHARD_NODE)) { + out.writeBoolean(false); } out.writeMap(waitForCheckpoints, StreamOutput::writeLongArray); out.writeTimeValue(waitForCheckpointsTimeout); @@ -351,14 +339,6 @@ public ActionRequestValidationException validate() { validationException = addValidationError("[preference] cannot be used with point in time", validationException); } } - if (minCompatibleShardNode() != null) { - if (isCcsMinimizeRoundtrips()) { - validationException = addValidationError( - "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible " + "shard version", - validationException - ); - } - } if (pointInTimeBuilder() != null && waitForCheckpoints.isEmpty() == false) { validationException = addValidationError("using [point in time] is not allowed with wait_for_checkpoints", validationException); @@ -401,15 +381,6 @@ long getAbsoluteStartMillis() { return absoluteStartMillis; } - /** - * Returns the minimum compatible shard version the search request needs to run on. If the version is null, then there are no - * restrictions imposed on shards versions part of this search. - */ - @Nullable - public Version minCompatibleShardNode() { - return minCompatibleShardNode; - } - /** * Sets the indices the search will be executed on. */ @@ -818,7 +789,6 @@ public boolean equals(Object o) { && Objects.equals(localClusterAlias, that.localClusterAlias) && absoluteStartMillis == that.absoluteStartMillis && ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips - && Objects.equals(minCompatibleShardNode, that.minCompatibleShardNode) && forceSyntheticSource == that.forceSyntheticSource; } @@ -840,7 +810,6 @@ public int hashCode() { localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, - minCompatibleShardNode, forceSyntheticSource ); } diff --git a/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java b/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java deleted file mode 100644 index 69ea4484ae691..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/search/VersionMismatchException.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.action.search; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.IOException; - -public class VersionMismatchException extends ElasticsearchException { - - public VersionMismatchException(String msg, Object... args) { - super(msg, args); - } - - public VersionMismatchException(StreamInput in) throws IOException { - super(in); - } - -} diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index af60979dfe169..80a85d3b9b748 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -10,7 +10,6 @@ package org.elasticsearch.rest.action.search; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.TransportSearchAction; @@ -100,12 +99,10 @@ public Set supportedCapabilities() { @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - SearchRequest searchRequest; - if (request.hasParam("min_compatible_shard_node")) { - searchRequest = new SearchRequest(Version.fromString(request.param("min_compatible_shard_node"))); - } else { - searchRequest = new SearchRequest(); - } + SearchRequest searchRequest = new SearchRequest(); + // access the BwC param, but just drop it + // this might be set by old clients + request.param("min_compatible_shard_node"); /* * We have to pull out the call to `source().size(size)` because * _update_by_query and _delete_by_query uses this same parsing diff --git a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 31739850e2d35..2c6be01c851e4 100644 --- a/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -19,7 +19,6 @@ import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.search.VersionMismatchException; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.client.internal.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -816,7 +815,7 @@ public void testIds() { ids.put(158, PeerRecoveryNotFound.class); ids.put(159, NodeHealthCheckFailureException.class); ids.put(160, NoSeedNodeLeftException.class); - ids.put(161, VersionMismatchException.class); + ids.put(161, null); // was org.elasticsearch.action.search.VersionMismatchException.class ids.put(162, ElasticsearchAuthenticationProcessingError.class); ids.put(163, RepositoryConflictException.class); ids.put(164, VersionConflictException.class); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index b63c88f623e21..d279fa5030a8c 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -13,26 +13,18 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TotalHits; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; -import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.RecoverySource; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.lucene.grouping.TopFieldGroups; import org.elasticsearch.search.DocValueFormat; @@ -47,24 +39,17 @@ import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; -import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.transport.Transport; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static java.util.Collections.singletonList; -import static org.elasticsearch.test.VersionUtils.allVersions; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; public class SearchQueryThenFetchAsyncActionTests extends ESTestCase { @@ -257,484 +242,4 @@ public void run() { assertThat(((FieldDoc) phase.sortedTopDocs().scoreDocs()[0]).fields[0], equalTo(0)); } } - - public void testMinimumVersionSameAsNewVersion() throws Exception { - var newVersion = VersionInformation.CURRENT; - var oldVersion = new VersionInformation( - VersionUtils.randomCompatibleVersion(random(), VersionUtils.getPreviousVersion()), - IndexVersions.MINIMUM_COMPATIBLE, - IndexVersionUtils.randomCompatibleVersion(random()) - ); - testMixedVersionsShardsSearch(newVersion, oldVersion, newVersion.nodeVersion()); - } - - public void testMinimumVersionBetweenNewAndOldVersion() throws Exception { - var oldVersion = new VersionInformation( - VersionUtils.getFirstVersion(), - IndexVersions.MINIMUM_COMPATIBLE, - IndexVersionUtils.randomCompatibleVersion(random()) - ); - - var newVersion = new VersionInformation( - VersionUtils.maxCompatibleVersion(VersionUtils.getFirstVersion()), - IndexVersions.MINIMUM_COMPATIBLE, - IndexVersion.current() - ); - - var minVersion = VersionUtils.randomVersionBetween( - random(), - allVersions().get(allVersions().indexOf(oldVersion.nodeVersion()) + 1), - newVersion.nodeVersion() - ); - - testMixedVersionsShardsSearch(newVersion, oldVersion, minVersion); - } - - private void testMixedVersionsShardsSearch(VersionInformation oldVersion, VersionInformation newVersion, Version minVersion) - throws Exception { - final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( - 0, - System.nanoTime(), - System::nanoTime - ); - int numConcurrent = randomIntBetween(1, 4); - - Map lookup = new ConcurrentHashMap<>(); - DiscoveryNode newVersionNode = DiscoveryNodeUtils.builder("node1").version(newVersion).build(); - DiscoveryNode oldVersionNode = DiscoveryNodeUtils.builder("node2").version(oldVersion).build(); - lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode)); - lookup.put("node2", new SearchAsyncActionTests.MockConnection(oldVersionNode)); - - OriginalIndices idx = new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS); - ArrayList list = new ArrayList<>(); - ShardRouting routingNewVersionShard = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 0), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - routingNewVersionShard = routingNewVersionShard.initialize(newVersionNode.getId(), "p0", 0); - routingNewVersionShard.started(); - list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard), idx)); - - ShardRouting routingOldVersionShard = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 1), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p1", 0); - routingOldVersionShard.started(); - list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); - - GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); - final SearchRequest searchRequest = new SearchRequest(minVersion); - searchRequest.setMaxConcurrentShardRequests(numConcurrent); - searchRequest.setBatchedReduceSize(2); - searchRequest.source(new SearchSourceBuilder().size(1)); - searchRequest.allowPartialSearchResults(false); - - SearchTransportService searchTransportService = new SearchTransportService(null, null, null); - SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); - SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - try ( - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( - searchRequest, - EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), - controller, - task::isCancelled, - task.getProgressListener(), - shardsIter.size(), - exc -> {} - ) - ) { - final List responses = new ArrayList<>(); - SearchQueryThenFetchAsyncAction newSearchAsyncAction = new SearchQueryThenFetchAsyncAction( - logger, - null, - searchTransportService, - (clusterAlias, node) -> lookup.get(node), - Collections.singletonMap("_na_", AliasFilter.EMPTY), - Collections.emptyMap(), - EsExecutors.DIRECT_EXECUTOR_SERVICE, - resultConsumer, - searchRequest, - new ActionListener<>() { - @Override - public void onFailure(Exception e) { - responses.add(e); - } - - public void onResponse(SearchResponse response) { - responses.add(response); - } - - ; - }, - shardsIter, - timeProvider, - new ClusterState.Builder(new ClusterName("test")).build(), - task, - SearchResponse.Clusters.EMPTY, - null - ); - - newSearchAsyncAction.start(); - assertThat(responses, hasSize(1)); - assertThat(responses.get(0), instanceOf(SearchPhaseExecutionException.class)); - SearchPhaseExecutionException e = (SearchPhaseExecutionException) responses.get(0); - assertThat(e.getCause(), instanceOf(VersionMismatchException.class)); - assertThat( - e.getCause().getMessage(), - equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]") - ); - } - } - - public void testMinimumVersionSameAsOldVersion() throws Exception { - var newVersion = VersionInformation.CURRENT; - var oldVersion = new VersionInformation( - VersionUtils.randomCompatibleVersion(random(), VersionUtils.getPreviousVersion()), - IndexVersions.MINIMUM_COMPATIBLE, - IndexVersionUtils.randomCompatibleVersion(random()) - ); - Version minVersion = oldVersion.nodeVersion(); - - final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( - 0, - System.nanoTime(), - System::nanoTime - ); - AtomicInteger successfulOps = new AtomicInteger(); - - Map lookup = new ConcurrentHashMap<>(); - DiscoveryNode newVersionNode = DiscoveryNodeUtils.builder("node1").version(newVersion).build(); - DiscoveryNode oldVersionNode = DiscoveryNodeUtils.builder("node2").version(oldVersion).build(); - lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode)); - lookup.put("node2", new SearchAsyncActionTests.MockConnection(oldVersionNode)); - - OriginalIndices idx = new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS); - ArrayList list = new ArrayList<>(); - ShardRouting routingNewVersionShard = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 0), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - routingNewVersionShard = routingNewVersionShard.initialize(newVersionNode.getId(), "p0", 0); - routingNewVersionShard.started(); - list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard), idx)); - - ShardRouting routingOldVersionShard = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 1), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p1", 0); - routingOldVersionShard.started(); - list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingOldVersionShard), idx)); - - GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); - final SearchRequest searchRequest = new SearchRequest(minVersion); - searchRequest.allowPartialSearchResults(false); - searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp"))); - - SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { - @Override - public void sendExecuteQuery( - Transport.Connection connection, - ShardSearchRequest request, - SearchTask task, - ActionListener listener - ) { - int shardId = request.shardId().id(); - QuerySearchResult queryResult = new QuerySearchResult( - new ShardSearchContextId("N/A", 123), - new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null), - null - ); - try { - SortField sortField = new SortField("timestamp", SortField.Type.LONG); - if (shardId == 0) { - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopFieldDocs( - new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), - new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) }, - new SortField[] { sortField } - ), - Float.NaN - ), - new DocValueFormat[] { DocValueFormat.RAW } - ); - } else if (shardId == 1) { - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopFieldDocs( - new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), - new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) }, - new SortField[] { sortField } - ), - Float.NaN - ), - new DocValueFormat[] { DocValueFormat.RAW } - ); - } - queryResult.from(0); - queryResult.size(1); - successfulOps.incrementAndGet(); - queryResult.incRef(); - new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start(); - } finally { - queryResult.decRef(); - } - } - }; - SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); - SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - try ( - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( - searchRequest, - EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), - controller, - task::isCancelled, - task.getProgressListener(), - shardsIter.size(), - exc -> {} - ) - ) { - CountDownLatch latch = new CountDownLatch(1); - SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( - logger, - null, - searchTransportService, - (clusterAlias, node) -> lookup.get(node), - Collections.singletonMap("_na_", AliasFilter.EMPTY), - Collections.emptyMap(), - EsExecutors.DIRECT_EXECUTOR_SERVICE, - resultConsumer, - searchRequest, - null, - shardsIter, - timeProvider, - new ClusterState.Builder(new ClusterName("test")).build(), - task, - SearchResponse.Clusters.EMPTY, - null - ) { - @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { - return new SearchPhase("test") { - @Override - public void run() { - latch.countDown(); - } - }; - } - }; - - action.start(); - latch.await(); - assertThat(successfulOps.get(), equalTo(2)); - SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); - assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); - assertThat(phase.totalHits().value, equalTo(2L)); - assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); - } - } - - public void testMinimumVersionShardDuringPhaseExecution() throws Exception { - var newVersion = VersionInformation.CURRENT; - var oldVersion = new VersionInformation( - VersionUtils.randomCompatibleVersion(random(), VersionUtils.getPreviousVersion()), - IndexVersions.MINIMUM_COMPATIBLE, - IndexVersionUtils.randomCompatibleVersion(random()) - ); - - Version minVersion = newVersion.nodeVersion(); - - final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( - 0, - System.nanoTime(), - System::nanoTime - ); - AtomicInteger successfulOps = new AtomicInteger(); - - Map lookup = new ConcurrentHashMap<>(); - DiscoveryNode newVersionNode1 = DiscoveryNodeUtils.builder("node1").version(newVersion).build(); - DiscoveryNode newVersionNode2 = DiscoveryNodeUtils.builder("node2").version(newVersion).build(); - DiscoveryNode oldVersionNode = DiscoveryNodeUtils.builder("node3").version(oldVersion).build(); - lookup.put("node1", new SearchAsyncActionTests.MockConnection(newVersionNode1)); - lookup.put("node2", new SearchAsyncActionTests.MockConnection(newVersionNode2)); - lookup.put("node3", new SearchAsyncActionTests.MockConnection(oldVersionNode)); - - OriginalIndices idx = new OriginalIndices(new String[] { "idx" }, SearchRequest.DEFAULT_INDICES_OPTIONS); - ArrayList list = new ArrayList<>(); - ShardRouting routingNewVersionShard1 = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 0), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - routingNewVersionShard1 = routingNewVersionShard1.initialize(newVersionNode1.getId(), "p0", 0); - routingNewVersionShard1.started(); - list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 0), singletonList(routingNewVersionShard1), idx)); - - ShardRouting routingNewVersionShard2 = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 1), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - routingNewVersionShard2 = routingNewVersionShard2.initialize(newVersionNode2.getId(), "p1", 0); - routingNewVersionShard2.started(); - list.add(new SearchShardIterator(null, new ShardId(new Index("idx", "_na_"), 1), singletonList(routingNewVersionShard2), idx)); - - GroupShardsIterator shardsIter = new GroupShardsIterator<>(list); - final SearchRequest searchRequest = new SearchRequest(minVersion); - searchRequest.allowPartialSearchResults(false); - searchRequest.source(new SearchSourceBuilder().size(1).sort(SortBuilders.fieldSort("timestamp"))); - - SearchTransportService searchTransportService = new SearchTransportService(null, null, null) { - @Override - public void sendExecuteQuery( - Transport.Connection connection, - ShardSearchRequest request, - SearchTask task, - ActionListener listener - ) { - int shardId = request.shardId().id(); - QuerySearchResult queryResult = new QuerySearchResult( - new ShardSearchContextId("N/A", 123), - new SearchShardTarget("node1", new ShardId("idx", "na", shardId), null), - null - ); - try { - SortField sortField = new SortField("timestamp", SortField.Type.LONG); - if (shardId == 0) { - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopFieldDocs( - new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), - new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) }, - new SortField[] { sortField } - ), - Float.NaN - ), - new DocValueFormat[] { DocValueFormat.RAW } - ); - } else if (shardId == 1) { - queryResult.topDocs( - new TopDocsAndMaxScore( - new TopFieldDocs( - new TotalHits(1, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO), - new FieldDoc[] { new FieldDoc(randomInt(1000), Float.NaN, new Object[] { shardId }) }, - new SortField[] { sortField } - ), - Float.NaN - ), - new DocValueFormat[] { DocValueFormat.RAW } - ); - } - queryResult.from(0); - queryResult.size(1); - successfulOps.incrementAndGet(); - queryResult.incRef(); - new Thread(() -> ActionListener.respondAndRelease(listener, queryResult)).start(); - } finally { - queryResult.decRef(); - } - } - }; - SearchPhaseController controller = new SearchPhaseController((t, r) -> InternalAggregationTestCase.emptyReduceContextBuilder()); - SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); - - CountDownLatch latch = new CountDownLatch(1); - try ( - QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( - searchRequest, - EsExecutors.DIRECT_EXECUTOR_SERVICE, - new NoopCircuitBreaker(CircuitBreaker.REQUEST), - controller, - task::isCancelled, - task.getProgressListener(), - shardsIter.size(), - exc -> {} - ) - ) { - SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction( - logger, - null, - searchTransportService, - (clusterAlias, node) -> lookup.get(node), - Collections.singletonMap("_na_", AliasFilter.EMPTY), - Collections.emptyMap(), - EsExecutors.DIRECT_EXECUTOR_SERVICE, - resultConsumer, - searchRequest, - null, - shardsIter, - timeProvider, - new ClusterState.Builder(new ClusterName("test")).build(), - task, - SearchResponse.Clusters.EMPTY, - null - ) { - @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { - return new SearchPhase("test") { - @Override - public void run() { - latch.countDown(); - } - }; - } - }; - ShardRouting routingOldVersionShard = ShardRouting.newUnassigned( - new ShardId(new Index("idx", "_na_"), 2), - true, - RecoverySource.EmptyStoreRecoverySource.INSTANCE, - new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"), - ShardRouting.Role.DEFAULT - ); - SearchShardIterator shardIt = new SearchShardIterator( - null, - new ShardId(new Index("idx", "_na_"), 2), - singletonList(routingOldVersionShard), - idx - ); - routingOldVersionShard = routingOldVersionShard.initialize(oldVersionNode.getId(), "p2", 0); - routingOldVersionShard.started(); - action.start(); - latch.await(); - assertThat(successfulOps.get(), equalTo(2)); - SearchPhaseController.ReducedQueryPhase phase = action.results.reduce(); - assertThat(phase.numReducePhases(), greaterThanOrEqualTo(1)); - assertThat(phase.totalHits().value, equalTo(2L)); - assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO)); - - SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null); - SearchActionListener listener = new SearchActionListener(searchShardTarget, 0) { - @Override - public void onFailure(Exception e) {} - - @Override - protected void innerOnResponse(SearchPhaseResult response) {} - }; - Exception e = expectThrows( - VersionMismatchException.class, - () -> action.executePhaseOnShard(shardIt, searchShardTarget, listener) - ); - assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]")); - } - } } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java index 23c956e6e52f2..3079b6d4b0371 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java @@ -39,7 +39,6 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -438,33 +437,6 @@ public QueryBuilder topDocsQuery() { assertEquals(1, validationErrors.validationErrors().size()); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); } - { - // Minimum compatible shard node version with ccs_minimize_roundtrips - SearchRequest searchRequest; - boolean isMinCompatibleShardVersion = randomBoolean(); - if (isMinCompatibleShardVersion) { - searchRequest = new SearchRequest(VersionUtils.randomVersion(random())); - } else { - searchRequest = new SearchRequest(); - } - - boolean shouldSetCcsMinimizeRoundtrips = randomBoolean(); - if (shouldSetCcsMinimizeRoundtrips) { - searchRequest.setCcsMinimizeRoundtrips(true); - } - ActionRequestValidationException validationErrors = searchRequest.validate(); - - if (isMinCompatibleShardVersion && shouldSetCcsMinimizeRoundtrips) { - assertNotNull(validationErrors); - assertEquals(1, validationErrors.validationErrors().size()); - assertEquals( - "[ccs_minimize_roundtrips] cannot be [true] when setting a minimum compatible shard version", - validationErrors.validationErrors().get(0) - ); - } else { - assertNull(validationErrors); - } - } { SearchRequest searchRequest = new SearchRequest().source( new SearchSourceBuilder().rankBuilder(new TestRankBuilder(100)) diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index 51f92bcda7da4..c0141da2432ce 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -59,7 +59,6 @@ import static org.elasticsearch.action.ActionListener.wrap; import static org.elasticsearch.transport.RemoteClusterAware.buildRemoteIndexName; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; -import static org.elasticsearch.xpack.ql.plugin.TransportActionUtils.executeRequestWithRetryAttempt; public final class TransportEqlSearchAction extends HandledTransportAction implements @@ -236,22 +235,11 @@ public static void operation( new TaskId(nodeId, task.getId()), task ); - executeRequestWithRetryAttempt( - clusterService, - listener::onFailure, - onFailure -> planExecutor.eql( - cfg, - request.query(), - params, - wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), onFailure) - ), - node -> transportService.sendRequest( - node, - EqlSearchAction.NAME, - request, - new ActionListenerResponseHandler<>(listener, EqlSearchResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) - ), - log + planExecutor.eql( + cfg, + request.query(), + params, + wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), listener::onFailure) ); } } diff --git a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetSearchAction.java b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetSearchAction.java index a6c369734f0e3..a79424b8b7d59 100644 --- a/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetSearchAction.java +++ b/x-pack/plugin/fleet/src/main/java/org/elasticsearch/xpack/fleet/rest/RestFleetSearchAction.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.fleet.rest; -import org.elasticsearch.Version; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.client.internal.node.NodeClient; @@ -57,12 +56,11 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - SearchRequest searchRequest; - if (request.hasParam("min_compatible_shard_node")) { - searchRequest = new SearchRequest(Version.fromString(request.param("min_compatible_shard_node"))); - } else { - searchRequest = new SearchRequest(); - } + SearchRequest searchRequest = new SearchRequest(); + // access the BwC param, but just drop it + // this might be set by old clients + request.param("min_compatible_shard_node"); + String[] indices = searchRequest.indices(); if (indices.length > 1) { throw new IllegalArgumentException( diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plugin/TransportActionUtils.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plugin/TransportActionUtils.java deleted file mode 100644 index 6431c83ee1c2e..0000000000000 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plugin/TransportActionUtils.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -package org.elasticsearch.xpack.ql.plugin; - -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.VersionMismatchException; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.xpack.ql.util.Holder; - -import java.util.function.Consumer; - -public final class TransportActionUtils { - - /** - * Execute a *QL request and re-try it in case the first request failed with a {@code VersionMismatchException} - * - * @param clusterService The cluster service instance - * @param onFailure On-failure handler in case the request doesn't fail with a {@code VersionMismatchException} - * @param queryRunner *QL query execution code, typically a Plan Executor running the query - * @param retryRequest Re-trial logic - * @param log Log4j logger - */ - public static void executeRequestWithRetryAttempt( - ClusterService clusterService, - Consumer onFailure, - Consumer> queryRunner, - Consumer retryRequest, - Logger log - ) { - - Holder retrySecondTime = new Holder(false); - queryRunner.accept(e -> { - // the search request likely ran on nodes with different versions of ES - // we will retry on a node with an older version that should generate a backwards compatible _search request - if (e instanceof SearchPhaseExecutionException - && ((SearchPhaseExecutionException) e).getCause() instanceof VersionMismatchException) { - if (log.isDebugEnabled()) { - log.debug("Caught exception type [{}] with cause [{}].", e.getClass().getName(), e.getCause()); - } - DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); - DiscoveryNode candidateNode = null; - for (DiscoveryNode node : clusterService.state().nodes()) { - // find the first node that's older than the current node - if (node != localNode && node.getVersion().before(localNode.getVersion())) { - candidateNode = node; - break; - } - } - if (candidateNode != null) { - if (log.isDebugEnabled()) { - log.debug( - "Candidate node to resend the request to: address [{}], id [{}], name [{}], version [{}]", - candidateNode.getAddress(), - candidateNode.getId(), - candidateNode.getName(), - candidateNode.getVersion() - ); - } - // re-send the request to the older node - retryRequest.accept(candidateNode); - } else { - retrySecondTime.set(true); - } - } else { - onFailure.accept(e); - } - }); - if (retrySecondTime.get()) { - if (log.isDebugEnabled()) { - log.debug("No candidate node found, likely all were upgraded in the meantime. Re-trying the original request."); - } - queryRunner.accept(onFailure); - } - } -} diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index b2ce91140de76..06293df4f4559 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.TotalHits; import org.apache.lucene.util.PriorityQueue; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DelegatingActionListener; import org.elasticsearch.action.search.ClosePointInTimeRequest; @@ -101,7 +100,6 @@ import static org.elasticsearch.action.ActionListener.wrap; import static org.elasticsearch.xpack.ql.execution.search.extractor.AbstractFieldHitExtractor.MultiValueSupport.LENIENT; import static org.elasticsearch.xpack.ql.execution.search.extractor.AbstractFieldHitExtractor.MultiValueSupport.NONE; -import static org.elasticsearch.xpack.sql.proto.VersionCompatibility.INTRODUCING_UNSIGNED_LONG; // TODO: add retry/back-off public class Querier { @@ -202,7 +200,7 @@ public static void closePointInTime(Client client, BytesReference pointInTimeId, public static SearchRequest prepareRequest(SearchSourceBuilder source, SqlConfiguration cfg, boolean includeFrozen, String... indices) { source.timeout(cfg.requestTimeout()); - SearchRequest searchRequest = new SearchRequest(Version.fromId(INTRODUCING_UNSIGNED_LONG.id)); + SearchRequest searchRequest = new SearchRequest(); if (source.pointInTimeBuilder() == null) { searchRequest.indices(indices); searchRequest.indicesOptions( diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java index 7a76ffe8eb109..41fa66ae36aeb 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.service.ClusterService; @@ -57,7 +56,6 @@ import static java.util.Collections.unmodifiableList; import static org.elasticsearch.action.ActionListener.wrap; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; -import static org.elasticsearch.xpack.ql.plugin.TransportActionUtils.executeRequestWithRetryAttempt; import static org.elasticsearch.xpack.sql.plugin.Transports.clusterName; import static org.elasticsearch.xpack.sql.plugin.Transports.username; import static org.elasticsearch.xpack.sql.proto.Mode.CLI; @@ -161,22 +159,11 @@ public static void operation( ); if (Strings.hasText(request.cursor()) == false) { - executeRequestWithRetryAttempt( - clusterService, - listener::onFailure, - onFailure -> planExecutor.sql( - cfg, - request.query(), - request.params(), - wrap(p -> listener.onResponse(createResponseWithSchema(request, p, task)), onFailure) - ), - node -> transportService.sendRequest( - node, - SqlQueryAction.NAME, - request, - new ActionListenerResponseHandler<>(listener, SqlQueryResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) - ), - log + planExecutor.sql( + cfg, + request.query(), + request.params(), + wrap(p -> listener.onResponse(createResponseWithSchema(request, p, task)), listener::onFailure) ); } else { Tuple decoded = Cursors.decodeFromStringWithZone(request.cursor(), planExecutor.writeableRegistry());