Skip to content

Commit

Permalink
Add back primary shard preference for queries (#7375)
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal authored May 4, 2023
1 parent 3613881 commit 63fbd0b
Show file tree
Hide file tree
Showing 24 changed files with 257 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))

### Dependencies
- Bump `com.netflix.nebula:gradle-info-plugin` from 12.0.0 to 12.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
assertNotNull(mapper.mappers().getMapper("field2"));
});

assertBusy(() -> assertTrue(client().prepareGet("index", "2").get().isExists()));
assertBusy(() -> assertTrue(client().prepareGet("index", "2").setPreference("_primary").get().isExists()));

// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,13 @@ private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas)
ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus();
while (status != ClusterHealthStatus.GREEN) {
// first, verify that search normal search works
SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).get();
SearchResponse searchResponse = client().prepareSearch("test")
.setPreference("_primary")
.setQuery(QueryBuilders.termQuery("field", "test"))
.execute()
.actionGet();
assertHitCount(searchResponse, 1);

Client client = client();
searchResponse = client.prepareSearch("test")
.setPreference(preference + Integer.toString(counter++))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public void testMatchedWithShould() throws Exception {
.should(queryStringQuery("dolor").queryName("dolor"))
.should(queryStringQuery("elit").queryName("elit"))
)
.setPreference("_primary")
.get();

assertHitCount(searchResponse, 2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void testConsistentHitsWithSameSeed() throws Exception {
for (int o = 0; o < outerIters; o++) {
final int seed = randomInt();
String preference = randomRealisticUnicodeOfLengthBetween(1, 10); // at least one char!!
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
while (preference.startsWith("_")) {
preference = randomRealisticUnicodeOfLengthBetween(1, 10);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public void testStopOneNodePreferenceWithRedState() throws IOException {
internalCluster().stopRandomDataNode();
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get();
String[] preferences = new String[] {
"_primary",
"_local",
"_primary_first",
"_prefer_nodes:somenode",
"_prefer_nodes:server2",
"_prefer_nodes:somenode,server2" };
Expand Down Expand Up @@ -140,13 +142,32 @@ public void testSimplePreference() {
client().prepareIndex("test").setSource("field1", "value1").get();
refresh();

SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get();
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").get();
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));

searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").get();
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ public void testProfileMatchesRegular() throws Exception {
.setProfile(false)
.addSort("id.keyword", SortOrder.ASC)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference("_primary")
.setRequestCache(false);

SearchRequestBuilder profile = client().prepareSearch("test")
.setQuery(q)
.setProfile(true)
.addSort("id.keyword", SortOrder.ASC)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setPreference("_primary")
.setRequestCache(false);

MultiSearchResponse.Item[] responses = client().prepareMultiSearch().add(vanilla).add(profile).get().getResponses();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void testSearchRandomPreference() throws InterruptedException, ExecutionE
int iters = scaledRandomIntBetween(10, 20);
for (int i = 0; i < iters; i++) {
String randomPreference = randomUnicodeOfLengthBetween(0, 4);
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary)
while (randomPreference.startsWith("_")) {
randomPreference = randomUnicodeOfLengthBetween(0, 4);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public ClusterSearchShardsRequest routing(String... routings) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public ClusterSearchShardsRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public ClusterSearchShardsRequestBuilder setRouting(String... routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public ClusterSearchShardsRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public GetRequest routing(String routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public GetRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public GetRequestBuilder setRouting(String routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public GetRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ public ActionRequestValidationException validate() {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public MultiGetRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public int shardId() {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public MultiGetShardRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ public SearchRequest routing(String... routings) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public SearchRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public SearchRequestBuilder setRouting(String... routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public SearchRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public int shardId() {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public MultiTermVectorsShardRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ public String preference() {

/**
* Sets the preference to execute the search. Defaults to randomize across
* shards. Can be set to {@code _local} to prefer local shards or a custom value,
* which guarantees that the same order will be used across different
* shards. Can be set to {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order will be used across different
* requests.
*/
public TermVectorsRequest preference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public TermVectorsRequestBuilder setRouting(String routing) {

/**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* {@code _local} to prefer local shards or a custom value, which guarantees that the same order
* {@code _local} to prefer local shards, {@code _primary} to execute only on primary shards,
* or a custom value, which guarantees that the same order
* will be used across different requests.
*/
public TermVectorsRequestBuilder setPreference(String preference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -574,6 +575,96 @@ public ShardIterator primaryShardIt() {
return new PlainShardIterator(shardId, Collections.emptyList());
}

/**
* Returns true if no primaries are active or initializing for this shard
*/
private boolean noPrimariesActive() {
return this.primary != null && !this.primary.active() && !this.primary.initializing();
}

/**
* Returns an iterator only on the active primary shard.
*/
public ShardIterator primaryActiveInitializingShardIt() {
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, Collections.emptyList());
}
return primaryShardIt();
}

/**
* Returns an ordered iterator on the active primary shard, followed by replica shards.
*/
public ShardIterator primaryFirstActiveInitializingShardsIt() {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
ordered.add(shardRouting);
if (shardRouting.primary()) {
// switch, its the matching node id
ordered.set(ordered.size() - 1, ordered.get(0));
ordered.set(0, shardRouting);
}
}
// no need to worry about primary first here..., its temporal
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator on replica shards.
*/
public ShardIterator replicaActiveInitializingShardIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, Collections.emptyList());
}

LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an ordered iterator on active replica shards, followed by the primary shard.
*/
public ShardIterator replicaFirstActiveInitializingShardsIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, Collections.emptyList());
}

ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion with the active replicas
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.add(replica);
}
}

// Add the primary shard
ordered.add(primary);

// Add initializing shards last
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns an iterator on active and initializing shards residing on the provided nodeId.
*/
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
int seed = shuffler.nextSeed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,14 @@ private ShardIterator preferenceActiveShardIterator(
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
case LOCAL:
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
case PRIMARY:
return indexShard.primaryActiveInitializingShardIt();
case REPLICA:
return indexShard.replicaActiveInitializingShardIt();
case PRIMARY_FIRST:
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,26 @@ public enum Preference {
*/
LOCAL("_local"),

/**
* Route to primary shards
*/
PRIMARY("_primary"),

/**
* Route to replica shards
*/
REPLICA("_replica"),

/**
* Route to primary shards first
*/
PRIMARY_FIRST("_primary_first"),

/**
* Route to replica shards first
*/
REPLICA_FIRST("_replica_first"),

/**
* Route to the local shard only
*/
Expand Down Expand Up @@ -92,6 +112,16 @@ public static Preference parse(String preference) {
return PREFER_NODES;
case "_local":
return LOCAL;
case "_primary":
return PRIMARY;
case "_replica":
return REPLICA;
case "_primary_first":
case "_primaryFirst":
return PRIMARY_FIRST;
case "_replica_first":
case "_replicaFirst":
return REPLICA_FIRST;
case "_only_local":
case "_onlyLocal":
return ONLY_LOCAL;
Expand Down
Loading

0 comments on commit 63fbd0b

Please sign in to comment.