Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move required routing validation to IndexRouting #79472

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.action.bulk;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
Expand All @@ -21,12 +20,12 @@
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.ingest.IngestTestPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -40,9 +39,9 @@

import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -104,7 +103,7 @@ public void testBulkWithWriteIndexAndRouting() {
// allowing the auto-generated timestamp to externally be set would allow making the index inconsistent with duplicate docs
public void testExternallySetAutoGeneratedTimestamp() {
IndexRequest indexRequest = new IndexRequest("index1").source(Collections.singletonMap("foo", "baz"));
indexRequest.process(Version.CURRENT, null, null); // sets the timestamp
indexRequest.process(); // sets the timestamp
if (randomBoolean()) {
indexRequest.id("test");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentFactory;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -48,16 +46,17 @@ protected int minimumNumberOfShards() {
}

public String findNonMatchingRoutingValue(String index, String id) {
OperationRouting operationRouting = new OperationRouting(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
ClusterState state = client().admin().cluster().prepareState().all().get().getState();
IndexMetadata metadata = state.metadata().index(index);
IndexMetadata withoutRoutingRequired = IndexMetadata.builder(metadata).putMapping("{}").build();
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(withoutRoutingRequired);
int routing = -1;
ShardId idShard;
ShardId routingShard;
int idShard;
int routingShard;
do {
idShard = operationRouting.shardId(state, index, id, null);
routingShard = operationRouting.shardId(state, index, id, Integer.toString(++routing));
} while (idShard.getId() == routingShard.id());
idShard = indexRouting.getShard(id, null);
routingShard = indexRouting.getShard(id, Integer.toString(++routing));
} while (idShard == routingShard);

return Integer.toString(routing);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.node.NodeClient;
Expand All @@ -40,7 +39,6 @@
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -498,11 +496,8 @@ protected void doRun() {
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final IndexMetadata indexMetadata = metadata.index(concreteIndex);
MappingMetadata mappingMd = indexMetadata.mapping();
Version indexCreated = indexMetadata.getCreationVersion();
indexRequest.resolveRouting(metadata);
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
indexRequest.process();
shardId = indexRouting.indexShard(
docWriteRequest.id(),
docWriteRequest.routing(),
Expand All @@ -511,16 +506,11 @@ protected void doRun() {
);
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metadata, concreteIndex.getName(),
(UpdateRequest) docWriteRequest);
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
shardId = indexRouting.updateShard(docWriteRequest.id(), docWriteRequest.routing());
break;
case DELETE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.id());
}
shardId = indexRouting.deleteShard(docWriteRequest.id(), docWriteRequest.routing());
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,15 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
Expand All @@ -63,6 +59,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -242,9 +240,7 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
case CREATED:
case UPDATED:
IndexRequest indexRequest = updateResult.action();
IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
MappingMetadata mappingMd = metadata.mapping();
indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
indexRequest.process();
context.setRequestToExecute(indexRequest);
break;
case DELETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.search.Explanation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -72,10 +71,6 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
final Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(state, request.request().index());
final AliasFilter aliasFilter = searchService.buildAliasFilter(state, request.concreteIndex(), indicesAndAliases);
request.request().filteringAlias(aliasFilter);
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().id());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -63,10 +62,6 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().id());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
Expand Down Expand Up @@ -54,25 +53,18 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL
for (int i = 0; i < request.items.size(); i++) {
MultiGetRequest.Item item = request.items.get(i);

String concreteSingleIndex;
ShardId shardId;
try {
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, item).getName();

String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, item).getName();
item.routing(clusterState.metadata().resolveIndexRouting(item.routing(), item.index()));
if ((item.routing() == null) && (clusterState.getMetadata().routingRequired(concreteSingleIndex))) {
responses.set(i, newItemFailure(concreteSingleIndex, item.id(),
new RoutingMissingException(concreteSingleIndex, item.id())));
continue;
}
shardId = clusterService.operationRouting()
.getShards(clusterState, concreteSingleIndex, item.id(), item.routing(), null)
.shardId();
} catch (Exception e) {
responses.set(i, newItemFailure(item.index(), item.id(), e));
continue;
}

ShardId shardId = clusterService.operationRouting()
.getShards(clusterState, concreteSingleIndex, item.id(), item.routing(), null)
.shardId();

MultiGetShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
shardRequest = new MultiGetShardRequest(request, shardId.getIndexName(), shardId.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,26 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -581,14 +579,7 @@ public VersionType versionType() {
}


public void process(Version indexCreatedVersion, @Nullable MappingMetadata mappingMd, String concreteIndex) {
if (mappingMd != null) {
// might as well check for routing here
if (mappingMd.routingRequired() && routing == null) {
throw new RoutingMissingException(concreteIndex, id);
}
}

public void process() {
if ("".equals(id)) {
throw new IllegalArgumentException("if _id is specified it must not be empty");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
Expand Down Expand Up @@ -54,26 +53,19 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final
Map<ShardId, MultiTermVectorsShardRequest> shardRequests = new HashMap<>();
for (int i = 0; i < request.requests.size(); i++) {
TermVectorsRequest termVectorsRequest = request.requests.get(i);
String concreteSingleIndex;
ShardId shardId;
try {
termVectorsRequest.routing(clusterState.metadata().resolveIndexRouting(termVectorsRequest.routing(),
termVectorsRequest.index()));
concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, termVectorsRequest).getName();
if (termVectorsRequest.routing() == null &&
clusterState.getMetadata().routingRequired(concreteSingleIndex)) {
responses.set(i, new MultiTermVectorsItemResponse(null,
new MultiTermVectorsResponse.Failure(concreteSingleIndex, termVectorsRequest.id(),
new RoutingMissingException(concreteSingleIndex, termVectorsRequest.id()))));
continue;
}
String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, termVectorsRequest).getName();
shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
termVectorsRequest.id(), termVectorsRequest.routing());
} catch (Exception e) {
responses.set(i, new MultiTermVectorsItemResponse(null,
new MultiTermVectorsResponse.Failure(termVectorsRequest.index(), termVectorsRequest.id(), e)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this changes the way we report a missing routing to list the original request index (which could be an alias) rather than the concrete index. I think we want to report the concrete index here?

Same in TransportMultiGetAction.

I suppose one could argue that reporting the original index-name is just as good since the info is baked into the exception message. But it does open a discussion on this being a breaking change (though a very small one)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it does open a discussion on this being a breaking change (though a very small one)?

I don't think we consider error messages to be part of the semver contract. But changing it without thinking about it is not good. I'll have a closer look.

continue;
}

ShardId shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
termVectorsRequest.id(), termVectorsRequest.routing());
MultiTermVectorsShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
shardRequest = new MultiTermVectorsShardRequest(shardId.getIndexName(), shardId.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.termvectors;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -68,10 +67,6 @@ protected boolean resolveIndex(TermVectorsRequest request) {
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias or a parent)
request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().id());
}
}

@Override
Expand Down
Loading