Skip to content

Commit

Permalink
Move required routing validation to IndexRouting (#79472)
Browse files Browse the repository at this point in the history
Now that we have a place responsible for picking shards we can move the
"routing is required" validation there, right next to the validation for
the `routing_path` stuff. This feels like a more convenient place to
have it. Certainly a nice place to unit test it.
  • Loading branch information
nik9000 authored Oct 27, 2021
1 parent 24ed875 commit 6741cbf
Show file tree
Hide file tree
Showing 20 changed files with 258 additions and 185 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
---
"Routing":
routing:
- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_shards: 5
number_of_routing_shards: 5
number_of_replicas: 0


- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_shards: 5
number_of_routing_shards: 5
number_of_replicas: 0

- do:
- do:
cluster.health:
wait_for_status: green

- do:
- do:
index:
index: test_1
id: 1
routing: "5"
body: { foo: bar }

- do:
- do:
mget:
index: test_1
stored_fields: [_routing]
Expand All @@ -33,10 +31,59 @@
- { _id: 1, routing: "4" }
- { _id: 1, routing: "5" }

- is_false: docs.0.found
- is_false: docs.1.found
- is_false: docs.0.found
- is_false: docs.1.found
- is_true: docs.2.found
- match: { docs.2._index: test_1 }
- match: { docs.2._id: "1" }
- match: { docs.2._routing: "5" }

---
requires routing:
- skip:
version: " - 7.99.99"
reason: "fails with an unexpected message in 7.x"

- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_shards: 5
number_of_replicas: 0
mappings:
_routing:
required: true

- do:
index:
index: test_1
id: 1
routing: "5"
body: { foo: bar }

- do:
indices.put_alias:
index: test_1
name: alias

- do:
mget:
stored_fields: [_routing]
body:
docs:
- { _id: 1, _index: test_1 }
- { _id: 1, _index: alias }
- { _id: 1, _index: test_1, routing: "5" }

- is_true: docs.2.found
- match: { docs.2._index: test_1 }
- match: { docs.2._id: "1" }
- match: { docs.2._routing: "5" }
- is_false: docs.0.found
- match: { docs.0.error.reason: "routing is required for [test_1]/[1]" }
- match: { docs.0._index: test_1 }
- is_false: docs.1.found
- match: { docs.1.error.reason: "routing is required for [test_1]/[1]" }
- match: { docs.1._index: test_1 }
- is_true: docs.2.found
- match: { docs.2._index: test_1 }
- match: { docs.2._id: "1" }
- match: { docs.2._routing: "5" }
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
---
routing:
- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_shards: 5
number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green

- do:
index:
index: test_1
id: 1
routing: "5"
body: { foo: bar baz }

- do:
mtermvectors:
index: test_1
fields: foo
body:
docs:
- { _id: 1 }
- { _id: 1, routing: "4" }
- { _id: 1, routing: "5" }

- is_false: docs.0.found
- is_false: docs.1.found
- is_true: docs.2.found
- match: { docs.2._index: test_1 }
- match: { docs.2._id: "1" }
- match: { docs.2.term_vectors.foo.terms.bar.term_freq: 1 }
- match: { docs.2.term_vectors.foo.terms.baz.term_freq: 1 }


---
requires routing:
- skip:
version: " - 7.99.99"
reason: "fails with an unexpected message in 7.x"

- do:
indices.create:
index: test_1
body:
settings:
index:
number_of_shards: 5
number_of_replicas: 0
mappings:
_routing:
required: true

- do:
index:
index: test_1
id: 1
routing: "5"
body: { foo: bar baz }

- do:
indices.put_alias:
index: test_1
name: alias

- do:
mtermvectors:
fields: foo
body:
docs:
- { _id: 1, _index: test_1 }
- { _id: 1, _index: alias }
- { _id: 1, _index: test_1, routing: "5" }

- is_false: docs.0.found
- match: { docs.0.error.reason: "routing is required for [test_1]/[1]" }
- match: { docs.0._index: test_1 }
- is_false: docs.1.found
- match: { docs.1.error.reason: "routing is required for [test_1]/[1]" }
- match: { docs.1._index: test_1 }
- is_true: docs.2.found
- match: { docs.2._index: test_1 }
- match: { docs.2._id: "1" }
- match: { docs.2.term_vectors.foo.terms.bar.term_freq: 1 }
- match: { docs.2.term_vectors.foo.terms.baz.term_freq: 1 }
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.bulk;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
Expand Down Expand Up @@ -114,7 +113,7 @@ public void testBulkWithWriteIndexAndRouting() {
// allowing the auto-generated timestamp to externally be set would allow making the index inconsistent with duplicate docs
public void testExternallySetAutoGeneratedTimestamp() {
IndexRequest indexRequest = new IndexRequest("index1").source(Collections.singletonMap("foo", "baz"));
indexRequest.process(Version.CURRENT, null, null); // sets the timestamp
indexRequest.process(); // sets the timestamp
if (randomBoolean()) {
indexRequest.id("test");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xcontent.XContentFactory;
Expand All @@ -48,18 +46,17 @@ protected int minimumNumberOfShards() {
}

public String findNonMatchingRoutingValue(String index, String id) {
OperationRouting operationRouting = new OperationRouting(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
ClusterState state = client().admin().cluster().prepareState().all().get().getState();
IndexMetadata metadata = state.metadata().index(index);
IndexMetadata withoutRoutingRequired = IndexMetadata.builder(metadata).putMapping("{}").build();
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(withoutRoutingRequired);
int routing = -1;
ShardId idShard;
ShardId routingShard;
int idShard;
int routingShard;
do {
idShard = operationRouting.shardId(state, index, id, null);
routingShard = operationRouting.shardId(state, index, id, Integer.toString(++routing));
} while (idShard.getId() == routingShard.id());
idShard = indexRouting.getShard(id, null);
routingShard = indexRouting.getShard(id, Integer.toString(++routing));
} while (idShard == routingShard);

return Integer.toString(routing);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.node.NodeClient;
Expand All @@ -40,7 +39,6 @@
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -534,11 +532,8 @@ protected void doRun() {
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
prohibitCustomRoutingOnDataStream(docWriteRequest, metadata);
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final IndexMetadata indexMetadata = metadata.index(concreteIndex);
MappingMetadata mappingMd = indexMetadata.mapping();
Version indexCreated = indexMetadata.getCreationVersion();
indexRequest.resolveRouting(metadata);
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
indexRequest.process();
shardId = indexRouting.indexShard(
docWriteRequest.id(),
docWriteRequest.routing(),
Expand All @@ -547,19 +542,11 @@ protected void doRun() {
);
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(
metadata,
concreteIndex.getName(),
(UpdateRequest) docWriteRequest
);
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
shardId = indexRouting.updateShard(docWriteRequest.id(), docWriteRequest.routing());
break;
case DELETE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.id());
}
shardId = indexRouting.deleteShard(docWriteRequest.id(), docWriteRequest.routing());
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -288,9 +286,7 @@ static boolean executeBulkItemRequest(
case CREATED:
case UPDATED:
IndexRequest indexRequest = updateResult.action();
IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();
MappingMetadata mappingMd = metadata.mapping();
indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());
indexRequest.process();
context.setRequestToExecute(indexRequest);
break;
case DELETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.search.Explanation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -85,10 +84,6 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
final Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(state, request.request().index());
final AliasFilter aliasFilter = searchService.buildAliasFilter(state, request.concreteIndex(), indicesAndAliases);
request.request().filteringAlias(aliasFilter);
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().id());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -82,10 +81,6 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
protected void resolveRequest(ClusterState state, InternalRequest request) {
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.
if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
throw new RoutingMissingException(request.concreteIndex(), request.request().id());
}
}

@Override
Expand Down
Loading

0 comments on commit 6741cbf

Please sign in to comment.